Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
wanisfahmyDE authored Feb 22, 2024
2 parents 406d856 + 6afa8d7 commit 1f9eb3b
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 37 deletions.
75 changes: 39 additions & 36 deletions backend/server/services/blueprint.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,8 @@ func (bj BlueprintJob) Run() {

// CreateBlueprint accepts a Blueprint instance and insert it to database
func CreateBlueprint(blueprint *models.Blueprint) errors.Error {
err := validateBlueprintAndMakePlan(blueprint)
if err != nil {
return err
}
err = bpManager.SaveDbBlueprint(blueprint)
if err != nil {
return err
}
if err := SanitizeBlueprint(blueprint); err != nil {
return errors.Convert(err)
}
err = ReloadBlueprints(cronManager)
if err != nil {
return errors.Internal.Wrap(err, "error reloading blueprints")
}
return nil
_, err := saveBlueprint(blueprint)
return err
}

// GetBlueprints returns a paginated list of Blueprints based on `query`
Expand Down Expand Up @@ -204,9 +190,9 @@ func saveBlueprint(blueprint *models.Blueprint) (*models.Blueprint, errors.Error
}

// reload schedule
err = ReloadBlueprints(cronManager)
err = reloadBlueprint(blueprint)
if err != nil {
return nil, errors.Internal.Wrap(err, "error reloading blueprints")
return nil, err
}
// done
return blueprint, nil
Expand Down Expand Up @@ -260,14 +246,10 @@ func DeleteBlueprint(id uint64) errors.Error {
}

var blueprintReloadLock sync.Mutex
var bpCronIdMap map[uint64]cron.EntryID

// ReloadBlueprints FIXME ...
func ReloadBlueprints(c *cron.Cron) (err errors.Error) {
// preventing concurrent reloads. It would be better to use Table Lock , however, it requires massive refactor
// like the `bpManager` must accept transaction. Use mutex as a temporary fix.
blueprintReloadLock.Lock()
defer blueprintReloadLock.Unlock()

// ReloadBlueprints reloades cronjobs based on blueprints
func ReloadBlueprints() (err errors.Error) {
enable := true
isManual := false
blueprints, _, err := bpManager.GetDbBlueprints(&services.GetBlueprintQuery{
Expand All @@ -277,27 +259,48 @@ func ReloadBlueprints(c *cron.Cron) (err errors.Error) {
if err != nil {
return err
}
for _, e := range c.Entries() {
c.Remove(e.ID)
for _, e := range cronManager.Entries() {
cronManager.Remove(e.ID)
}
c.Stop()
cronManager.Stop()
bpCronIdMap = make(map[uint64]cron.EntryID, len(blueprints))
for _, blueprint := range blueprints {
blueprintLog.Info("Add blueprint id:[%d] cronConfg[%s] to cron job", blueprint.ID, blueprint.CronConfig)
blueprintJob := &BlueprintJob{
Blueprint: blueprint,
}
if _, err := c.AddJob(blueprint.CronConfig, blueprintJob); err != nil {
blueprintLog.Error(err, failToCreateCronJob)
return errors.Default.Wrap(err, "created cron job failed")
err := reloadBlueprint(blueprint)
if err != nil {
return err
}
}
if len(blueprints) > 0 {
c.Start()
cronManager.Start()
}
logger.Info("total %d blueprints were scheduled", len(blueprints))
return nil
}

func reloadBlueprint(blueprint *models.Blueprint) errors.Error {
// preventing concurrent reloads. It would be better to use Table Lock , however, it requires massive refactor
// like the `bpManager` must accept transaction. Use mutex as a temporary fix.
blueprintReloadLock.Lock()
defer blueprintReloadLock.Unlock()

cronId, scheduled := bpCronIdMap[blueprint.ID]
if scheduled {
cronManager.Remove(cronId)
delete(bpCronIdMap, blueprint.ID)
logger.Info("removed blueprint %d from cronjobs, cron id: %v", blueprint.ID, cronId)
}
if blueprint.Enable && !blueprint.IsManual {
if cronId, err := cronManager.AddJob(blueprint.CronConfig, &BlueprintJob{blueprint}); err != nil {
blueprintLog.Error(err, failToCreateCronJob)
return errors.Default.Wrap(err, "created cron job failed")
} else {
bpCronIdMap[blueprint.ID] = cronId
logger.Info("added blueprint %d to cronjobs, cron id: %v, cron config: %s", blueprint.ID, cronId, blueprint.CronConfig)
}
}
return nil
}

func createPipelineByBlueprint(blueprint *models.Blueprint, syncPolicy *models.SyncPolicy) (*models.Pipeline, errors.Error) {
var plan models.PipelinePlan
var err errors.Error
Expand Down
2 changes: 1 addition & 1 deletion backend/server/services/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func pipelineServiceInit() {
panic(err)
}

err = ReloadBlueprints(cronManager)
err = ReloadBlueprints()
if err != nil {
panic(err)
}
Expand Down

0 comments on commit 1f9eb3b

Please sign in to comment.