Skip to content

Commit

Permalink
[eclipse-kanto#56] Sync download/update/activate phases across agents (
Browse files Browse the repository at this point in the history
…eclipse-kanto#57)

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov3@bosch.com>
  • Loading branch information
dimitar-dimitrow authored Apr 11, 2024
1 parent bbeef46 commit 84d770b
Show file tree
Hide file tree
Showing 7 changed files with 307 additions and 187 deletions.
19 changes: 10 additions & 9 deletions updatem/orchestration/update_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,11 @@ type updateOperation struct {
desiredState *types.DesiredState
statesPerDomain map[api.UpdateManager]*types.DesiredState

done chan bool
phaseChannels map[phase]chan bool

errChan chan bool
errMsg string

identDone chan bool
identErrChan chan bool
identErrMsg string

rebootRequired bool

desiredStateCallback api.DesiredStateFeedbackHandler
Expand Down Expand Up @@ -76,13 +72,10 @@ func newUpdateOperation(domainAgents map[string]api.UpdateManager, activityID st

statesPerDomain: statesPerDomain,
desiredState: desiredState,
phaseChannels: generatePhaseChannels(),

done: make(chan bool, 1),
errChan: make(chan bool, 1),

identDone: make(chan bool, 1),
identErrChan: make(chan bool, 1),

desiredStateCallback: desiredStateCallback,
}, nil
}
Expand All @@ -93,3 +86,11 @@ func (operation *updateOperation) updateStatus(status types.StatusType) {

operation.status = status
}

func generatePhaseChannels() map[phase]chan bool {
phaseChannels := make(map[phase]chan bool, len(orderedPhases))
for _, phase := range orderedPhases {
phaseChannels[phase] = make(chan bool, 1)
}
return phaseChannels
}
5 changes: 1 addition & 4 deletions updatem/orchestration/update_operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,7 @@ func TestNewUpdateOperation(t *testing.T) {
assert.Equal(t, types.StatusIdentifying, testOp.domains["domain2"])
assert.NotNil(t, testOp.actions)
assert.NotNil(t, testOp.statesPerDomain)
assert.NotNil(t, testOp.done)
assert.NotNil(t, testOp.errChan)
assert.NotNil(t, testOp.identDone)
assert.NotNil(t, testOp.identErrChan)
assert.NotNil(t, testOp.phaseChannels)
assert.False(t, testOp.rebootRequired)
assert.Equal(t, handler, testOp.desiredStateCallback)
})
Expand Down
73 changes: 49 additions & 24 deletions updatem/orchestration/update_orchestrator_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,44 +30,69 @@ func (orchestrator *updateOrchestrator) apply(ctx context.Context) (bool, error)
}(updateManagerForDomain, statePerDomain)
}

if err := orchestrator.waitIdentification(ctx); err != nil {
wait, err := orchestrator.waitPhase(ctx, phaseIdentification, handlePhaseCompletion)
if err != nil {
return false, err
}

err := orchestrator.waitCompletion(ctx)
for i := 1; i < len(orderedPhases) && wait; i++ {
wait, err = orchestrator.waitPhase(ctx, orderedPhases[i], handlePhaseCompletion)
}
return orchestrator.operation.rebootRequired && orchestrator.operation.status == types.StatusCompleted, err
}

func (orchestrator *updateOrchestrator) waitIdentification(ctx context.Context) error {
type phaseHandler func(ctx context.Context, phase phase, orchestrator *updateOrchestrator)

func (orchestrator *updateOrchestrator) waitPhase(ctx context.Context, currentPhase phase, handle phaseHandler) (bool, error) {
select {
case <-time.After(orchestrator.phaseTimeout):
orchestrator.operation.updateStatus(types.StatusIdentificationFailed)
return fmt.Errorf("identification phase not completed in %v", orchestrator.phaseTimeout)
case <-orchestrator.operation.identErrChan:
return fmt.Errorf(orchestrator.operation.identErrMsg)
if currentPhase == phaseIdentification {
orchestrator.operation.updateStatus(types.StatusIdentificationFailed)
} else {
orchestrator.operation.updateStatus(types.StatusIncomplete)
}
return false, fmt.Errorf("%s phase not done in %v", currentPhase, orchestrator.phaseTimeout)
case <-orchestrator.operation.errChan:
return fmt.Errorf(orchestrator.operation.errMsg)
case <-orchestrator.operation.identDone:
logger.Debug("the identification phase is completed")
return nil
return false, fmt.Errorf(orchestrator.operation.errMsg)
case running := <-orchestrator.operation.phaseChannels[currentPhase]:
logger.Info("the %s phase is done", currentPhase)
if running {
go handle(ctx, currentPhase, orchestrator)
return true, nil
}
return false, nil
case <-ctx.Done():
orchestrator.operation.updateStatus(types.StatusIncomplete)
return fmt.Errorf("the update manager instance is terminated")
return false, fmt.Errorf("the update manager instance is terminated")
}
}

func (orchestrator *updateOrchestrator) waitCompletion(ctx context.Context) error {
select {
case <-time.After(orchestrator.phaseTimeout):
orchestrator.operation.updateStatus(types.StatusIncomplete)
return fmt.Errorf("update operation not completed in %v", orchestrator.phaseTimeout)
case <-orchestrator.operation.errChan:
return fmt.Errorf(orchestrator.operation.errMsg)
case <-orchestrator.operation.done:
return nil
case <-ctx.Done():
orchestrator.operation.updateStatus(types.StatusIncomplete)
return fmt.Errorf("the update manager instance is terminated")
func handlePhaseCompletion(ctx context.Context, completedPhase phase, orchestrator *updateOrchestrator) {
orchestrator.operationLock.Lock()
defer orchestrator.operationLock.Unlock()

if orchestrator.operation == nil {
return
}

executeCommand := func(status types.StatusType, command types.CommandType) {
for domain, domainStatus := range orchestrator.operation.domains {
if domainStatus == status {
orchestrator.command(ctx, orchestrator.operation.activityID, domain, command)
}
}
}
switch completedPhase {
case phaseIdentification:
executeCommand(types.StatusIdentified, types.CommandDownload)
case phaseDownload:
executeCommand(types.BaselineStatusDownloadSuccess, types.CommandUpdate)
case phaseUpdate:
executeCommand(types.BaselineStatusUpdateSuccess, types.CommandActivate)
case phaseActivation:
executeCommand(types.BaselineStatusActivationSuccess, types.CommandCleanup)
default:
logger.Error("unknown phase %s", completedPhase)
}
}

Expand Down
Loading

0 comments on commit 84d770b

Please sign in to comment.