Skip to content

Commit

Permalink
chore: update processor logic to accomodate jobRunID aborts
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Oct 6, 2023
1 parent 523f7f4 commit 90fdddf
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 24 deletions.
52 changes: 45 additions & 7 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ type Handle struct {
eventSchemaV2Enabled bool
archivalEnabled misc.ValueLoader[bool]
eventAuditEnabled map[string]bool

toAbortJobRunIDs misc.ValueLoader[string]
}

adaptiveLimit func(int64) int64
Expand Down Expand Up @@ -2582,11 +2584,27 @@ func ConvertToFilteredTransformerResponse(events []transformer.TransformerEvent,
supportedMessageTypesCache := make(map[string]*cacheValue)
supportedMessageEventsCache := make(map[string]*cacheValue)

toAbortJobRunIDs := config.GetStringVar("", "RSources.toAbortJobRunIDs")

// filter unsupported message types
var resp transformer.TransformerResponse
for i := range events {
event := &events[i]

if toAbortJobRunIDs != "" {
abortIDs := strings.Split(toAbortJobRunIDs, ",")
if slices.Contains(abortIDs, event.Metadata.SourceJobRunID) {
failedEvents = append(failedEvents,
transformer.TransformerResponse{
Output: event.Message,
StatusCode: 400,
Metadata: event.Metadata,
Error: "jobRunID configured to abort",
},
)
continue
}
}

if filter {
// filter unsupported message types
supportedTypes, ok := supportedMessageTypesCache[event.Destination.ID]
Expand Down Expand Up @@ -2616,21 +2634,41 @@ func ConvertToFilteredTransformerResponse(events []transformer.TransformerEvent,
messageEvent, typOk := event.Message["event"].(string)
if !typOk {
// add to FailedEvents
resp = transformer.TransformerResponse{Output: event.Message, StatusCode: 400, Metadata: event.Metadata, Error: "Invalid message event. Type assertion failed"}
failedEvents = append(failedEvents, resp)
failedEvents = append(
failedEvents,
transformer.TransformerResponse{
Output: event.Message,
StatusCode: 400,
Metadata: event.Metadata,
Error: "Invalid message event. Type assertion failed",
},
)
continue
}
if !slices.Contains(supportedEvents.values, messageEvent) {
resp = transformer.TransformerResponse{Output: event.Message, StatusCode: types.FilterEventCode, Metadata: event.Metadata, Error: "Event not supported"}
failedEvents = append(failedEvents, resp)
failedEvents = append(
failedEvents,
transformer.TransformerResponse{
Output: event.Message,
StatusCode: types.FilterEventCode,
Metadata: event.Metadata,
Error: "Event not supported",
},
)
continue
}
}

}
// allow event
resp = transformer.TransformerResponse{Output: event.Message, StatusCode: 200, Metadata: event.Metadata}
responses = append(responses, resp)
responses = append(
responses,
transformer.TransformerResponse{
Output: event.Message,
StatusCode: 200,
Metadata: event.Metadata,
},
)
}

return transformer.Response{Events: responses, FailedEvents: failedEvents}
Expand Down
69 changes: 69 additions & 0 deletions processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3480,6 +3480,75 @@ var _ = Describe("Static Function Tests", func() {
response := ConvertToFilteredTransformerResponse(events, true)
Expect(response).To(Equal(expectedResponse))
})

It("filters events if jobRunID is configured to abort", func() {
destinationConfig := backendconfig.DestinationT{
DestinationDefinition: backendconfig.DestinationDefinitionT{
Config: map[string]interface{}{},
},
}
config.Set("RSources.toAbortJobRunIDs", "abortJobRunID")

events := []transformer.TransformerEvent{
{
Metadata: transformer.Metadata{
MessageID: "message-1",
SourceJobRunID: "abortJobRunID",
},
Message: map[string]interface{}{
"some-key-1": "some-value-1",
"type": " IDENTIFY ",
},
Destination: destinationConfig,
},
{
Metadata: transformer.Metadata{
MessageID: "message-2",
},
Message: map[string]interface{}{
"some-key-2": "some-value-2",
"type": "track",
},
Destination: destinationConfig,
},
{
Metadata: transformer.Metadata{
MessageID: "message-2",
},
Message: map[string]interface{}{
"some-key-2": "some-value-2",
"type": 123,
},
Destination: destinationConfig,
},
}

expectedResponse := transformer.Response{
Events: []transformer.TransformerResponse{
{
Output: events[1].Message,
StatusCode: 200,
Metadata: events[1].Metadata,
},
{
Output: events[2].Message,
StatusCode: 200,
Metadata: events[2].Metadata,
},
},
FailedEvents: []transformer.TransformerResponse{
{
Output: events[0].Message,
StatusCode: 400,
Metadata: events[0].Metadata,
Error: "jobRunID configured to abort",
},
},
}
response := ConvertToFilteredTransformerResponse(events, true)
Expect(response).To(Equal(expectedResponse))

})
})
})

Expand Down
1 change: 1 addition & 0 deletions router/batchrouter/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type Handle struct {
mainLoopFreq misc.ValueLoader[time.Duration]
disableEgress bool
toAbortDestinationIDs misc.ValueLoader[string]
toAbortJobRunIDs misc.ValueLoader[string]
warehouseServiceMaxRetryTime misc.ValueLoader[time.Duration]
transformerURL string
datePrefixOverride misc.ValueLoader[string]
Expand Down
1 change: 1 addition & 0 deletions router/batchrouter/handle_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ func (brt *Handle) setupReloadableVars() {
brt.uploadFreq = config.GetReloadableDurationVar(30, time.Second, "BatchRouter.uploadFreqInS", "BatchRouter.uploadFreq")
brt.mainLoopFreq = config.GetReloadableDurationVar(30, time.Second, "BatchRouter.mainLoopFreq")
brt.toAbortDestinationIDs = config.GetReloadableStringVar("", "BatchRouter.toAbortDestinationIDs")
brt.toAbortJobRunIDs = config.GetReloadableStringVar("", "RSources.toAbortJobRunIDs")
brt.warehouseServiceMaxRetryTime = config.GetReloadableDurationVar(3, time.Hour, "BatchRouter.warehouseServiceMaxRetryTime", "BatchRouter.warehouseServiceMaxRetryTimeinHr")
brt.datePrefixOverride = config.GetReloadableStringVar("", "BatchRouter.datePrefixOverride")
brt.customDatePrefix = config.GetReloadableStringVar("", "BatchRouter.customDatePrefix")
Expand Down
10 changes: 9 additions & 1 deletion router/batchrouter/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,15 @@ func (w *worker) processJobAsync(jobsWg *sync.WaitGroup, destinationJobs *Destin
for _, job := range destinationJobs.jobs {
var params router_utils.JobParameters
_ = json.Unmarshal(job.Parameters, &params)
if drain, reason := router_utils.ToBeDrained(job, params, router_utils.AbortConfigs{ToAbortDestinationIDs: brt.toAbortDestinationIDs.Load()}, destinationsMap); drain {
if drain, reason := router_utils.ToBeDrained(
job,
params,
router_utils.AbortConfigs{
DestinationIDs: brt.toAbortDestinationIDs.Load(),
JobRunIDs: brt.toAbortJobRunIDs.Load(),
},
destinationsMap,
); drain {
status := jobsdb.JobStatusT{
JobID: job.JobID,
AttemptNum: job.LastJobStatus.AttemptNum + 1,
Expand Down
1 change: 1 addition & 0 deletions router/handle_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ func (rt *Handle) setupReloadableVars() {
rt.reloadableConfig.minRetryBackoff = config.GetReloadableDurationVar(10, time.Second, "Router.minRetryBackoff", "Router.minRetryBackoffInS")
rt.reloadableConfig.maxRetryBackoff = config.GetReloadableDurationVar(300, time.Second, "Router.maxRetryBackoff", "Router.maxRetryBackoffInS")
rt.reloadableConfig.toAbortDestinationIDs = config.GetReloadableStringVar("", "Router.toAbortDestinationIDs")
rt.reloadableConfig.toAbortJobRunIDs = config.GetReloadableStringVar("", "RSources.toAbortJobRunIDs")
rt.reloadableConfig.pickupFlushInterval = config.GetReloadableDurationVar(2, time.Second, "Router.pickupFlushInterval")
rt.reloadableConfig.failingJobsPenaltySleep = config.GetReloadableDurationVar(2000, time.Millisecond, "Router.failingJobsPenaltySleep")
rt.reloadableConfig.failingJobsPenaltyThreshold = config.GetReloadableFloat64Var(0.6, "Router.failingJobsPenaltyThreshold")
Expand Down
79 changes: 79 additions & 0 deletions router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,85 @@ var _ = Describe("router", func() {
Eventually(func() bool { return routerAborted && procErrorStored }, 5*time.Second, 100*time.Millisecond).Should(Equal(true))
})

It("aborts jobs that bear a abort configured jobRunId", func() {
config.Set("RSources.toAbortJobRunIDs", "someJobRunId")
router := &Handle{
Reporting: &reporting.NOOP{},
}
c.mockBackendConfig.EXPECT().AccessToken().AnyTimes()

router.Setup(gaDestinationDefinition, logger.NOP, conf, c.mockBackendConfig, c.mockRouterJobsDB, c.mockProcErrorsDB, transientsource.NewEmptyService(), rsources.NewNoOpService(), destinationdebugger.NewNoOpService())
mockNetHandle := mocksRouter.NewMockNetHandle(c.mockCtrl)
router.netHandle = mockNetHandle

gaPayload := `{"body": {"XML": {}, "FORM": {}, "JSON": {}}, "type": "REST", "files": {}, "method": "POST", "params": {"t": "event", "v": "1", "an": "RudderAndroidClient", "av": "1.0", "ds": "android-sdk", "ea": "Demo Track", "ec": "Demo Category", "el": "Demo Label", "ni": 0, "qt": 59268380964, "ul": "en-US", "cid": "anon_id", "tid": "UA-185645846-1", "uip": "[::1]", "aiid": "com.rudderlabs.android.sdk"}, "userId": "anon_id", "headers": {}, "version": "1", "endpoint": "https://www.google-analytics.com/collect"}`
parameters := fmt.Sprintf(`{"source_job_run_id": "someJobRunId", "source_id": "1fMCVYZboDlYlauh4GFsEo2JU77", "destination_id": "%s", "message_id": "2f548e6d-60f6-44af-a1f4-62b3272445c3", "received_at": "2021-06-28T10:04:48.527+05:30", "transform_at": "processor"}`, gaDestinationID) // skipcq: GO-R4002

unprocessedJobsList := []*jobsdb.JobT{
{
UUID: uuid.New(),
UserID: "u1",
JobID: 2010,
CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC),
ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC),
CustomVal: customVal["GA"],
EventPayload: []byte(gaPayload),
LastJobStatus: jobsdb.JobStatusT{
AttemptNum: 0,
},
Parameters: []byte(parameters),
WorkspaceId: workspaceID,
},
}

payloadLimit := router.reloadableConfig.payloadLimit
c.mockRouterJobsDB.EXPECT().GetToProcess(gomock.Any(), jobsdb.GetQueryParams{
CustomValFilters: []string{customVal["GA"]},
ParameterFilters: []jobsdb.ParameterFilterT{{Name: "destination_id", Value: gaDestinationID}},
PayloadSizeLimit: payloadLimit.Load(),
JobsLimit: 10000,
}, nil).Times(1).Return(&jobsdb.MoreJobsResult{JobsResult: jobsdb.JobsResult{Jobs: unprocessedJobsList}}, nil)

var routerAborted bool
var procErrorStored bool

c.mockRouterJobsDB.EXPECT().UpdateJobStatus(gomock.Any(), gomock.Any(), []string{customVal["GA"]}, nil).Times(1)

c.mockProcErrorsDB.EXPECT().Store(gomock.Any(), gomock.Any()).Times(1).
Do(func(ctx context.Context, jobList []*jobsdb.JobT) {
job := jobList[0]
var parameters map[string]interface{}
err := json.Unmarshal(job.Parameters, &parameters)
if err != nil {
panic(err)
}

Expect(job.JobID).To(Equal(unprocessedJobsList[0].JobID))
Expect(job.CustomVal).To(Equal(unprocessedJobsList[0].CustomVal))
Expect(job.UserID).To(Equal(unprocessedJobsList[0].UserID))
procErrorStored = true
})

c.mockRouterJobsDB.EXPECT().WithUpdateSafeTx(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, f func(tx jobsdb.UpdateSafeTx) error) {
_ = f(jobsdb.EmptyUpdateSafeTx())
}).Return(nil).Times(1)

c.mockRouterJobsDB.EXPECT().UpdateJobStatusInTx(gomock.Any(), gomock.Any(), gomock.Any(), []string{customVal["GA"]}, nil).Times(1).
Do(func(ctx context.Context, tx jobsdb.UpdateSafeTx, drainList []*jobsdb.JobStatusT, _, _ interface{}) {
Expect(drainList).To(HaveLen(1))
assertJobStatus(unprocessedJobsList[0], drainList[0], jobsdb.Aborted.State, "410", `{"reason": "job expired"}`, 0)
routerAborted = true
})

<-router.backendConfigInitialized
worker := newPartitionWorker(context.Background(), router, gaDestinationID)
defer worker.Stop()
Expect(worker.Work()).To(BeTrue())
Expect(worker.pickupCount).To(Equal(len(unprocessedJobsList)))
Eventually(func() bool { return routerAborted && procErrorStored }, 5*time.Second, 100*time.Millisecond).Should(Equal(true))

})

It("aborts events that have reached max retries", func() {
config.Set("Router.jobRetention", "24h")
mockNetHandle := mocksRouter.NewMockNetHandle(c.mockCtrl)
Expand Down
4 changes: 3 additions & 1 deletion router/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ type reloadableConfig struct {
jobsBatchTimeout misc.ValueLoader[time.Duration]
failingJobsPenaltyThreshold misc.ValueLoader[float64]
failingJobsPenaltySleep misc.ValueLoader[time.Duration]
toAbortDestinationIDs misc.ValueLoader[string]
noOfJobsToBatchInAWorker misc.ValueLoader[int]
jobsDBCommandTimeout misc.ValueLoader[time.Duration]
jobdDBMaxRetries misc.ValueLoader[int]
Expand All @@ -78,4 +77,7 @@ type reloadableConfig struct {
transformerProxy misc.ValueLoader[bool]
skipRtAbortAlertForTransformation misc.ValueLoader[bool] // represents if event delivery(via transformerProxy) should be alerted via router-aborted-count alert def
skipRtAbortAlertForDelivery misc.ValueLoader[bool] // represents if transformation(router or batch) should be alerted via router-aborted-count alert def

toAbortDestinationIDs misc.ValueLoader[string]
toAbortJobRunIDs misc.ValueLoader[string]
}
31 changes: 17 additions & 14 deletions router/utils/utils.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package utils

import (
"slices"
"strings"
"time"

"golang.org/x/exp/slices"

"github.com/tidwall/sjson"

"github.com/rudderlabs/rudder-go-kit/config"
Expand Down Expand Up @@ -67,14 +66,17 @@ type JobParameters struct {
}

type AbortConfigs struct {
ToAbortDestinationIDs string
// source
// connection
// jobRunID
DestinationIDs string
JobRunIDs string
// ...
}

func ToBeDrained(job *jobsdb.JobT, jobParams JobParameters, abortConfig AbortConfigs, destinationsMap map[string]*DestinationWithSources) (bool, string) {
func ToBeDrained(
job *jobsdb.JobT,
jobParams JobParameters,
abortConfig AbortConfigs,
destinationsMap map[string]*DestinationWithSources,
) (bool, string) {
// drain if job is older than the destination's retention time
createdAt := job.CreatedAt
destID := jobParams.DestinationID
Expand All @@ -86,18 +88,19 @@ func ToBeDrained(job *jobsdb.JobT, jobParams JobParameters, abortConfig AbortCon
return true, "destination is disabled"
}

toAbortDestinationIDs := abortConfig.ToAbortDestinationIDs
if toAbortDestinationIDs != "" {
abortIDs := strings.Split(toAbortDestinationIDs, ",")
if abortConfig.DestinationIDs != "" {
abortIDs := strings.Split(abortConfig.DestinationIDs, ",")
if slices.Contains(abortIDs, destID) {
return true, "destination configured to abort"
}
}

// toAbortSourceIDS
// toAbortConnection(sourceID, destID)
// toAbortJobRunID/TaskRunID
// ...
if abortConfig.JobRunIDs != "" {
abortIDs := strings.Split(abortConfig.JobRunIDs, ",")
if slices.Contains(abortIDs, jobParams.SourceJobRunID) {
return true, "jobRunID configured to abort"
}
}

return false, ""
}
Expand Down
10 changes: 9 additions & 1 deletion router/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,15 @@ func (w *worker) workLoop() {
panic(fmt.Errorf("unmarshalling of job parameters failed for job %d (%s): %w", job.JobID, string(job.Parameters), err))
}
w.rt.destinationsMapMu.RLock()
abort, abortReason := routerutils.ToBeDrained(job, parameters, routerutils.AbortConfigs{ToAbortDestinationIDs: w.rt.reloadableConfig.toAbortDestinationIDs.Load()}, w.rt.destinationsMap)
abort, abortReason := routerutils.ToBeDrained(
job,
parameters,
routerutils.AbortConfigs{
DestinationIDs: w.rt.reloadableConfig.toAbortDestinationIDs.Load(),
JobRunIDs: w.rt.reloadableConfig.toAbortJobRunIDs.Load(),
},
w.rt.destinationsMap,
)
abortTag := abortReason
w.rt.destinationsMapMu.RUnlock()
if !abort {
Expand Down

0 comments on commit 90fdddf

Please sign in to comment.