diff --git a/backend/core/models/blueprint.go b/backend/core/models/blueprint.go index 354c94408b0..f30adec9f05 100644 --- a/backend/core/models/blueprint.go +++ b/backend/core/models/blueprint.go @@ -82,9 +82,13 @@ func (BlueprintScope) TableName() string { return "_devlake_blueprint_scopes" } +type TriggerSyncPolicy struct { + SkipCollectors bool `json:"skipCollectors"` + FullSync bool `json:"fullSync"` +} + type SyncPolicy struct { - SkipOnFail bool `json:"skipOnFail"` - SkipCollectors bool `json:"skipCollectors"` - FullSync bool `json:"fullSync"` - TimeAfter *time.Time `json:"timeAfter"` + SkipOnFail bool `json:"skipOnFail"` + TimeAfter *time.Time `json:"timeAfter"` + TriggerSyncPolicy } diff --git a/backend/helpers/e2ehelper/data_flow_tester.go b/backend/helpers/e2ehelper/data_flow_tester.go index b409fd3604a..82fff33c14a 100644 --- a/backend/helpers/e2ehelper/data_flow_tester.go +++ b/backend/helpers/e2ehelper/data_flow_tester.go @@ -228,7 +228,9 @@ func (t *DataFlowTester) Subtask(subtaskMeta plugin.SubTaskMeta, taskData interf // SubtaskContext creates a subtask context func (t *DataFlowTester) SubtaskContext(taskData interface{}) plugin.SubTaskContext { syncPolicy := &models.SyncPolicy{ - FullSync: true, + TriggerSyncPolicy: models.TriggerSyncPolicy{ + FullSync: true, + }, } return contextimpl.NewStandaloneSubTaskContext(context.Background(), runner.CreateBasicRes(t.Cfg, t.Log, t.Db), t.Name, taskData, t.Name, syncPolicy) } diff --git a/backend/helpers/pluginhelper/api/collector_state_manager_test.go b/backend/helpers/pluginhelper/api/collector_state_manager_test.go index 5ffc39805bb..f42eecd2a95 100644 --- a/backend/helpers/pluginhelper/api/collector_state_manager_test.go +++ b/backend/helpers/pluginhelper/api/collector_state_manager_test.go @@ -101,7 +101,7 @@ func TestCollectorStateManager(t *testing.T) { { name: "Full sync - with timeAfter", state: &models.CollectorLatestState{TimeAfter: &time1, LatestSuccessStart: &time1}, - syncPolicy: &models.SyncPolicy{FullSync: true}, + syncPolicy: &models.SyncPolicy{TriggerSyncPolicy: models.TriggerSyncPolicy{FullSync: true}}, expectedIsIncremental: false, expectedSince: &time1, expectedNewStateTimeAfter: &time1, @@ -109,7 +109,7 @@ func TestCollectorStateManager(t *testing.T) { { name: "Full sync - with newer timeAfter", state: &models.CollectorLatestState{TimeAfter: &time1, LatestSuccessStart: &time1}, - syncPolicy: &models.SyncPolicy{TimeAfter: &time2, FullSync: true}, + syncPolicy: &models.SyncPolicy{TimeAfter: &time2, TriggerSyncPolicy: models.TriggerSyncPolicy{FullSync: true}}, expectedIsIncremental: false, expectedSince: &time2, expectedNewStateTimeAfter: &time2, @@ -117,7 +117,7 @@ func TestCollectorStateManager(t *testing.T) { { name: "Full sync - with older timeAfter", state: &models.CollectorLatestState{TimeAfter: &time1, LatestSuccessStart: &time1}, - syncPolicy: &models.SyncPolicy{TimeAfter: &time0, FullSync: true}, + syncPolicy: &models.SyncPolicy{TimeAfter: &time0, TriggerSyncPolicy: models.TriggerSyncPolicy{FullSync: true}}, expectedIsIncremental: false, expectedSince: &time0, expectedNewStateTimeAfter: &time0, @@ -125,7 +125,7 @@ func TestCollectorStateManager(t *testing.T) { { name: "Full sync - without timeAfter", state: &models.CollectorLatestState{TimeAfter: nil, LatestSuccessStart: &time1}, - syncPolicy: &models.SyncPolicy{FullSync: true}, + syncPolicy: &models.SyncPolicy{TriggerSyncPolicy: models.TriggerSyncPolicy{FullSync: true}}, expectedIsIncremental: false, expectedSince: nil, expectedNewStateTimeAfter: nil, diff --git a/backend/helpers/pluginhelper/api/subtask_state_manager_test.go b/backend/helpers/pluginhelper/api/subtask_state_manager_test.go index 9bbcc4f396b..06eeaa43374 100644 --- a/backend/helpers/pluginhelper/api/subtask_state_manager_test.go +++ b/backend/helpers/pluginhelper/api/subtask_state_manager_test.go @@ -111,7 +111,7 @@ func TestSubtaskStateManager(t *testing.T) { { name: "Full sync - with timeAfter", state: &models.SubtaskState{TimeAfter: &time1, PrevStartedAt: &time1}, - syncPolicy: &models.SyncPolicy{FullSync: true}, + syncPolicy: &models.SyncPolicy{TriggerSyncPolicy: models.TriggerSyncPolicy{FullSync: true}}, expectedIsIncremental: false, expectedSince: &time1, expectedNewStateTimeAfter: &time1, @@ -119,7 +119,7 @@ func TestSubtaskStateManager(t *testing.T) { { name: "Full sync - with newer timeAfter", state: &models.SubtaskState{TimeAfter: &time1, PrevStartedAt: &time1}, - syncPolicy: &models.SyncPolicy{TimeAfter: &time2, FullSync: true}, + syncPolicy: &models.SyncPolicy{TimeAfter: &time2, TriggerSyncPolicy: models.TriggerSyncPolicy{FullSync: true}}, expectedIsIncremental: false, expectedSince: &time2, expectedNewStateTimeAfter: &time2, @@ -127,7 +127,7 @@ func TestSubtaskStateManager(t *testing.T) { { name: "Full sync - with older timeAfter", state: &models.SubtaskState{TimeAfter: &time1, PrevStartedAt: &time1}, - syncPolicy: &models.SyncPolicy{TimeAfter: &time0, FullSync: true}, + syncPolicy: &models.SyncPolicy{TimeAfter: &time0, TriggerSyncPolicy: models.TriggerSyncPolicy{FullSync: true}}, expectedIsIncremental: false, expectedSince: &time0, expectedNewStateTimeAfter: &time0, @@ -135,7 +135,7 @@ func TestSubtaskStateManager(t *testing.T) { { name: "Full sync - without timeAfter", state: &models.SubtaskState{TimeAfter: nil, PrevStartedAt: &time1}, - syncPolicy: &models.SyncPolicy{FullSync: true}, + syncPolicy: &models.SyncPolicy{TriggerSyncPolicy: models.TriggerSyncPolicy{FullSync: true}}, expectedIsIncremental: false, expectedSince: nil, expectedNewStateTimeAfter: nil, diff --git a/backend/server/api/blueprints/blueprints.go b/backend/server/api/blueprints/blueprints.go index cbe36cb438d..23383d4682f 100644 --- a/backend/server/api/blueprints/blueprints.go +++ b/backend/server/api/blueprints/blueprints.go @@ -165,7 +165,7 @@ func Patch(c *gin.Context) { // @Tags framework/blueprints // @Accept application/json // @Param blueprintId path string true "blueprintId" -// @Param skipCollectors body models.SyncPolicy false "json" +// @Param skipCollectors body models.TriggerSyncPolicy false "json" // @Success 200 {object} models.Pipeline // @Failure 400 {object} shared.ApiBody "Bad Request" // @Failure 500 {object} shared.ApiBody "Internal Error" @@ -178,18 +178,18 @@ func Trigger(c *gin.Context) { return } - syncPolicy := &models.SyncPolicy{} + triggerSyncPolicy := &models.TriggerSyncPolicy{} if c.Request.Body == nil || c.Request.ContentLength == 0 { - syncPolicy.SkipCollectors = false - syncPolicy.FullSync = false + triggerSyncPolicy.SkipCollectors = false + triggerSyncPolicy.FullSync = false } else { - err = c.ShouldBindJSON(syncPolicy) + err = c.ShouldBindJSON(triggerSyncPolicy) if err != nil { shared.ApiOutputError(c, errors.BadInput.Wrap(err, "error binding request body")) return } } - pipeline, err := services.TriggerBlueprint(id, syncPolicy, true) + pipeline, err := services.TriggerBlueprint(id, triggerSyncPolicy, true) if err != nil { shared.ApiOutputError(c, errors.Default.Wrap(err, "error triggering blueprint")) return diff --git a/backend/server/services/blueprint.go b/backend/server/services/blueprint.go index cf7d20ccffb..d400abae27a 100644 --- a/backend/server/services/blueprint.go +++ b/backend/server/services/blueprint.go @@ -400,7 +400,7 @@ func SequentializePipelinePlans(plans ...models.PipelinePlan) models.PipelinePla } // TriggerBlueprint triggers blueprint immediately -func TriggerBlueprint(id uint64, syncPolicy *models.SyncPolicy, shouldSanitize bool) (*models.Pipeline, errors.Error) { +func TriggerBlueprint(id uint64, triggerSyncPolicy *models.TriggerSyncPolicy, shouldSanitize bool) (*models.Pipeline, errors.Error) { // load record from db blueprint, err := GetBlueprint(id, false) if err != nil { @@ -410,9 +410,13 @@ func TriggerBlueprint(id uint64, syncPolicy *models.SyncPolicy, shouldSanitize b if !blueprint.Enable { return nil, errors.BadInput.New("blueprint is not enabled") } - blueprint.SkipCollectors = syncPolicy.SkipCollectors - blueprint.FullSync = syncPolicy.FullSync - pipeline, err := createPipelineByBlueprint(blueprint, syncPolicy) + blueprint.SkipCollectors = triggerSyncPolicy.SkipCollectors + blueprint.FullSync = triggerSyncPolicy.FullSync + pipeline, err := createPipelineByBlueprint(blueprint, &models.SyncPolicy{ + SkipOnFail: false, + TimeAfter: nil, + TriggerSyncPolicy: *triggerSyncPolicy, + }) if err != nil { return nil, err }