Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry-pick #7888 fix(framework): update trigger api's request body schema #7890

Merged
merged 1 commit into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions backend/core/models/blueprint.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,13 @@ func (BlueprintScope) TableName() string {
return "_devlake_blueprint_scopes"
}

type TriggerSyncPolicy struct {
SkipCollectors bool `json:"skipCollectors"`
FullSync bool `json:"fullSync"`
}

type SyncPolicy struct {
SkipOnFail bool `json:"skipOnFail"`
SkipCollectors bool `json:"skipCollectors"`
FullSync bool `json:"fullSync"`
TimeAfter *time.Time `json:"timeAfter"`
SkipOnFail bool `json:"skipOnFail"`
TimeAfter *time.Time `json:"timeAfter"`
TriggerSyncPolicy
}
4 changes: 3 additions & 1 deletion backend/helpers/e2ehelper/data_flow_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,9 @@ func (t *DataFlowTester) Subtask(subtaskMeta plugin.SubTaskMeta, taskData interf
// SubtaskContext creates a subtask context
func (t *DataFlowTester) SubtaskContext(taskData interface{}) plugin.SubTaskContext {
syncPolicy := &models.SyncPolicy{
FullSync: true,
TriggerSyncPolicy: models.TriggerSyncPolicy{
FullSync: true,
},
}
return contextimpl.NewStandaloneSubTaskContext(context.Background(), runner.CreateBasicRes(t.Cfg, t.Log, t.Db), t.Name, taskData, t.Name, syncPolicy)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,31 +101,31 @@ func TestCollectorStateManager(t *testing.T) {
{
name: "Full sync - with timeAfter",
state: &models.CollectorLatestState{TimeAfter: &time1, LatestSuccessStart: &time1},
syncPolicy: &models.SyncPolicy{FullSync: true},
syncPolicy: &models.SyncPolicy{TriggerSyncPolicy: models.TriggerSyncPolicy{FullSync: true}},
expectedIsIncremental: false,
expectedSince: &time1,
expectedNewStateTimeAfter: &time1,
},
{
name: "Full sync - with newer timeAfter",
state: &models.CollectorLatestState{TimeAfter: &time1, LatestSuccessStart: &time1},
syncPolicy: &models.SyncPolicy{TimeAfter: &time2, FullSync: true},
syncPolicy: &models.SyncPolicy{TimeAfter: &time2, TriggerSyncPolicy: models.TriggerSyncPolicy{FullSync: true}},
expectedIsIncremental: false,
expectedSince: &time2,
expectedNewStateTimeAfter: &time2,
},
{
name: "Full sync - with older timeAfter",
state: &models.CollectorLatestState{TimeAfter: &time1, LatestSuccessStart: &time1},
syncPolicy: &models.SyncPolicy{TimeAfter: &time0, FullSync: true},
syncPolicy: &models.SyncPolicy{TimeAfter: &time0, TriggerSyncPolicy: models.TriggerSyncPolicy{FullSync: true}},
expectedIsIncremental: false,
expectedSince: &time0,
expectedNewStateTimeAfter: &time0,
},
{
name: "Full sync - without timeAfter",
state: &models.CollectorLatestState{TimeAfter: nil, LatestSuccessStart: &time1},
syncPolicy: &models.SyncPolicy{FullSync: true},
syncPolicy: &models.SyncPolicy{TriggerSyncPolicy: models.TriggerSyncPolicy{FullSync: true}},
expectedIsIncremental: false,
expectedSince: nil,
expectedNewStateTimeAfter: nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,31 +111,31 @@ func TestSubtaskStateManager(t *testing.T) {
{
name: "Full sync - with timeAfter",
state: &models.SubtaskState{TimeAfter: &time1, PrevStartedAt: &time1},
syncPolicy: &models.SyncPolicy{FullSync: true},
syncPolicy: &models.SyncPolicy{TriggerSyncPolicy: models.TriggerSyncPolicy{FullSync: true}},
expectedIsIncremental: false,
expectedSince: &time1,
expectedNewStateTimeAfter: &time1,
},
{
name: "Full sync - with newer timeAfter",
state: &models.SubtaskState{TimeAfter: &time1, PrevStartedAt: &time1},
syncPolicy: &models.SyncPolicy{TimeAfter: &time2, FullSync: true},
syncPolicy: &models.SyncPolicy{TimeAfter: &time2, TriggerSyncPolicy: models.TriggerSyncPolicy{FullSync: true}},
expectedIsIncremental: false,
expectedSince: &time2,
expectedNewStateTimeAfter: &time2,
},
{
name: "Full sync - with older timeAfter",
state: &models.SubtaskState{TimeAfter: &time1, PrevStartedAt: &time1},
syncPolicy: &models.SyncPolicy{TimeAfter: &time0, FullSync: true},
syncPolicy: &models.SyncPolicy{TimeAfter: &time0, TriggerSyncPolicy: models.TriggerSyncPolicy{FullSync: true}},
expectedIsIncremental: false,
expectedSince: &time0,
expectedNewStateTimeAfter: &time0,
},
{
name: "Full sync - without timeAfter",
state: &models.SubtaskState{TimeAfter: nil, PrevStartedAt: &time1},
syncPolicy: &models.SyncPolicy{FullSync: true},
syncPolicy: &models.SyncPolicy{TriggerSyncPolicy: models.TriggerSyncPolicy{FullSync: true}},
expectedIsIncremental: false,
expectedSince: nil,
expectedNewStateTimeAfter: nil,
Expand Down
12 changes: 6 additions & 6 deletions backend/server/api/blueprints/blueprints.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func Patch(c *gin.Context) {
// @Tags framework/blueprints
// @Accept application/json
// @Param blueprintId path string true "blueprintId"
// @Param skipCollectors body models.SyncPolicy false "json"
// @Param skipCollectors body models.TriggerSyncPolicy false "json"
// @Success 200 {object} models.Pipeline
// @Failure 400 {object} shared.ApiBody "Bad Request"
// @Failure 500 {object} shared.ApiBody "Internal Error"
Expand All @@ -178,18 +178,18 @@ func Trigger(c *gin.Context) {
return
}

syncPolicy := &models.SyncPolicy{}
triggerSyncPolicy := &models.TriggerSyncPolicy{}
if c.Request.Body == nil || c.Request.ContentLength == 0 {
syncPolicy.SkipCollectors = false
syncPolicy.FullSync = false
triggerSyncPolicy.SkipCollectors = false
triggerSyncPolicy.FullSync = false
} else {
err = c.ShouldBindJSON(syncPolicy)
err = c.ShouldBindJSON(triggerSyncPolicy)
if err != nil {
shared.ApiOutputError(c, errors.BadInput.Wrap(err, "error binding request body"))
return
}
}
pipeline, err := services.TriggerBlueprint(id, syncPolicy, true)
pipeline, err := services.TriggerBlueprint(id, triggerSyncPolicy, true)
if err != nil {
shared.ApiOutputError(c, errors.Default.Wrap(err, "error triggering blueprint"))
return
Expand Down
12 changes: 8 additions & 4 deletions backend/server/services/blueprint.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func SequentializePipelinePlans(plans ...models.PipelinePlan) models.PipelinePla
}

// TriggerBlueprint triggers blueprint immediately
func TriggerBlueprint(id uint64, syncPolicy *models.SyncPolicy, shouldSanitize bool) (*models.Pipeline, errors.Error) {
func TriggerBlueprint(id uint64, triggerSyncPolicy *models.TriggerSyncPolicy, shouldSanitize bool) (*models.Pipeline, errors.Error) {
// load record from db
blueprint, err := GetBlueprint(id, false)
if err != nil {
Expand All @@ -410,9 +410,13 @@ func TriggerBlueprint(id uint64, syncPolicy *models.SyncPolicy, shouldSanitize b
if !blueprint.Enable {
return nil, errors.BadInput.New("blueprint is not enabled")
}
blueprint.SkipCollectors = syncPolicy.SkipCollectors
blueprint.FullSync = syncPolicy.FullSync
pipeline, err := createPipelineByBlueprint(blueprint, syncPolicy)
blueprint.SkipCollectors = triggerSyncPolicy.SkipCollectors
blueprint.FullSync = triggerSyncPolicy.FullSync
pipeline, err := createPipelineByBlueprint(blueprint, &models.SyncPolicy{
SkipOnFail: false,
TimeAfter: nil,
TriggerSyncPolicy: *triggerSyncPolicy,
})
if err != nil {
return nil, err
}
Expand Down
Loading