diff --git a/pkg/app/pipedv1/cmd/piped/piped.go b/pkg/app/pipedv1/cmd/piped/piped.go index 82e998acc7..3f8ef8b880 100644 --- a/pkg/app/pipedv1/cmd/piped/piped.go +++ b/pkg/app/pipedv1/cmd/piped/piped.go @@ -371,6 +371,13 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) { // TODO: Implement the drift detector controller. } + // Initialize secret decrypter. + decrypter, err := p.initializeSecretDecrypter(cfg) + if err != nil { + input.Logger.Error("failed to initialize secret decrypter", zap.Error(err)) + return err + } + // Start running deployment controller. { c := controller.NewController( @@ -380,6 +387,7 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) { deploymentLister, commandLister, notifier, + decrypter, p.gracePeriod, input.Logger, tracerProvider, @@ -667,7 +675,6 @@ func (p *piped) runPlugins(ctx context.Context, pluginsCfg []config.PipedPlugin, return plugins, nil } -// TODO: Remove this once the decryption task by plugin call to the plugin service is implemented. func (p *piped) initializeSecretDecrypter(cfg *config.PipedSpec) (crypto.Decrypter, error) { sm := cfg.SecretManagement if sm == nil { diff --git a/pkg/app/pipedv1/controller/controller.go b/pkg/app/pipedv1/controller/controller.go index a2827a5f2d..87ca073aff 100644 --- a/pkg/app/pipedv1/controller/controller.go +++ b/pkg/app/pipedv1/controller/controller.go @@ -75,6 +75,10 @@ type notifier interface { Notify(event model.NotificationEvent) } +type secretDecrypter interface { + Decrypt(string) (string, error) +} + type DeploymentController interface { Run(ctx context.Context) error } @@ -90,6 +94,7 @@ type controller struct { deploymentLister deploymentLister commandLister commandLister notifier notifier + secretDecrypter secretDecrypter // gRPC clients to communicate with plugins. pluginClients []pluginapi.PluginClient @@ -130,6 +135,7 @@ func NewController( deploymentLister deploymentLister, commandLister commandLister, notifier notifier, + secretDecrypter secretDecrypter, gracePeriod time.Duration, logger *zap.Logger, tracerProvider trace.TracerProvider, @@ -142,6 +148,7 @@ func NewController( deploymentLister: deploymentLister, commandLister: commandLister, notifier: notifier, + secretDecrypter: secretDecrypter, planners: make(map[string]*planner), donePlanners: make(map[string]time.Time), @@ -443,6 +450,7 @@ func (c *controller) startNewPlanner(ctx context.Context, d *model.Deployment) ( c.apiClient, c.gitClient, c.notifier, + c.secretDecrypter, c.logger, c.tracerProvider, ) @@ -581,6 +589,7 @@ func (c *controller) startNewScheduler(ctx context.Context, d *model.Deployment) c.gitClient, c.stageBasedPluginsMap, c.notifier, + c.secretDecrypter, c.logger, c.tracerProvider, ) diff --git a/pkg/app/pipedv1/controller/planner.go b/pkg/app/pipedv1/controller/planner.go index c527fcd5cd..ce5140f88f 100644 --- a/pkg/app/pipedv1/controller/planner.go +++ b/pkg/app/pipedv1/controller/planner.go @@ -75,7 +75,10 @@ type planner struct { notifier notifier metadataStore metadatastore.MetadataStore - // TODO: Find a way to show log from pluggin's planner + // The secretDecrypter is used to decrypt secrets + // which encrypted using PipeCD built-in secret management. + secretDecrypter secretDecrypter + logger *zap.Logger tracer trace.Tracer @@ -98,6 +101,7 @@ func newPlanner( apiClient apiClient, gitClient gitClient, notifier notifier, + secretDecrypter secretDecrypter, logger *zap.Logger, tracerProvider trace.TracerProvider, ) *planner { @@ -121,6 +125,7 @@ func newPlanner( gitClient: gitClient, metadataStore: metadatastore.NewMetadataStore(apiClient, d), notifier: notifier, + secretDecrypter: secretDecrypter, doneDeploymentStatus: d.Status, cancelledCh: make(chan *model.ReportableCommand, 1), nowFunc: time.Now, @@ -193,31 +198,34 @@ func (p *planner) Run(ctx context.Context) error { Branch: p.deployment.GitPath.Repo.Branch, } - runningDSP := deploysource.NewProvider( - filepath.Join(p.workingDir, "running-deploysource"), - deploysource.NewGitSourceCloner(p.gitClient, repoCfg, "running", p.lastSuccessfulCommitHash), - p.deployment.GetGitPath(), nil, // TODO: pass secret decrypter? - ) - rds, err := runningDSP.Get(ctx, io.Discard) // TODO: pass not io.Discard - if err != nil { - // TODO: log error - return fmt.Errorf("error while preparing deploy source data (%v)", err) - } - runningDS = rds.ToPluginDeploySource() - targetDSP := deploysource.NewProvider( filepath.Join(p.workingDir, "target-deploysource"), deploysource.NewGitSourceCloner(p.gitClient, repoCfg, "target", p.deployment.Trigger.Commit.Hash), - p.deployment.GetGitPath(), nil, // TODO: pass secret decrypter? + p.deployment.GetGitPath(), + p.secretDecrypter, ) - tds, err := targetDSP.Get(ctx, io.Discard) // TODO: pass not io.Discard + tds, err := targetDSP.Get(ctx, io.Discard) if err != nil { - // TODO: log error - return fmt.Errorf("error while preparing deploy source data (%v)", err) + p.logger.Error("error while preparing target deploy source data", zap.Error(err)) + return err } targetDS = tds.ToPluginDeploySource() - // TODO: Pass running DS as well if need? + if p.lastSuccessfulCommitHash != "" { + runningDSP := deploysource.NewProvider( + filepath.Join(p.workingDir, "running-deploysource"), + deploysource.NewGitSourceCloner(p.gitClient, repoCfg, "running", p.lastSuccessfulCommitHash), + p.deployment.GetGitPath(), + p.secretDecrypter, + ) + rds, err := runningDSP.Get(ctx, io.Discard) + if err != nil { + p.logger.Error("error while preparing running deploy source data", zap.Error(err)) + return err + } + runningDS = rds.ToPluginDeploySource() + } + out, err := p.buildPlan(ctx, runningDS, targetDS) // If the deployment was already cancelled, we ignore the plan result. diff --git a/pkg/app/pipedv1/controller/scheduler.go b/pkg/app/pipedv1/controller/scheduler.go index 0fbd3d28bc..5d6e2ce42f 100644 --- a/pkg/app/pipedv1/controller/scheduler.go +++ b/pkg/app/pipedv1/controller/scheduler.go @@ -45,10 +45,11 @@ type scheduler struct { stageBasedPluginsMap map[string]pluginapi.PluginClient - apiClient apiClient - gitClient gitClient - metadataStore metadatastore.MetadataStore - notifier notifier + apiClient apiClient + gitClient gitClient + metadataStore metadatastore.MetadataStore + notifier notifier + secretDecrypter secretDecrypter targetDSP deploysource.Provider runningDSP deploysource.Provider @@ -80,6 +81,7 @@ func newScheduler( gitClient gitClient, stageBasedPluginsMap map[string]pluginapi.PluginClient, notifier notifier, + secretsDecrypter secretDecrypter, logger *zap.Logger, tracerProvider trace.TracerProvider, ) *scheduler { @@ -99,6 +101,7 @@ func newScheduler( gitClient: gitClient, metadataStore: metadatastore.NewMetadataStore(apiClient, d), notifier: notifier, + secretDecrypter: secretsDecrypter, doneDeploymentStatus: d.Status, cancelledCh: make(chan *model.ReportableCommand, 1), logger: logger, @@ -165,7 +168,7 @@ func (s *scheduler) Cancel(cmd model.ReportableCommand) { } // Run starts running the scheduler. -// It determines what stage should be executed next by which executor. +// It determines what stage should be executed next by which plugin. // The returning error does not mean that the pipeline was failed, // but it means that the scheduler could not finish its job normally. func (s *scheduler) Run(ctx context.Context) error { @@ -193,7 +196,7 @@ func (s *scheduler) Run(ctx context.Context) error { } controllermetrics.UpdateDeploymentStatus(s.deployment, model.DeploymentStatus_DEPLOYMENT_RUNNING) - // notify the deployment started event + // Notify the deployment started event users, groups, err := s.getApplicationNotificationMentions(model.NotificationEventType_EVENT_DEPLOYMENT_STARTED) if err != nil { s.logger.Error("failed to get the list of users or groups", zap.Error(err)) @@ -223,16 +226,20 @@ func (s *scheduler) Run(ctx context.Context) error { Branch: s.deployment.GitPath.Repo.Branch, } - s.runningDSP = deploysource.NewProvider( - filepath.Join(s.workingDir, "running-deploysource"), - deploysource.NewGitSourceCloner(s.gitClient, repoCfg, "running", s.deployment.RunningCommitHash), - s.deployment.GetGitPath(), nil, // TODO: pass secret decrypter? - ) + if s.deployment.RunningCommitHash != "" { + s.runningDSP = deploysource.NewProvider( + filepath.Join(s.workingDir, "running-deploysource"), + deploysource.NewGitSourceCloner(s.gitClient, repoCfg, "running", s.deployment.RunningCommitHash), + s.deployment.GetGitPath(), + s.secretDecrypter, + ) + } s.targetDSP = deploysource.NewProvider( filepath.Join(s.workingDir, "target-deploysource"), deploysource.NewGitSourceCloner(s.gitClient, repoCfg, "target", s.deployment.Trigger.Commit.Hash), - s.deployment.GetGitPath(), nil, // TODO: pass secret decrypter? + s.deployment.GetGitPath(), + s.secretDecrypter, ) ds, err := s.targetDSP.Get(ctx, io.Discard) @@ -469,13 +476,13 @@ func (s *scheduler) executeStage(sig StopSignal, ps *model.PipelineStage) (final rds, err := s.runningDSP.Get(ctx, io.Discard) if err != nil { - s.logger.Error("failed to get running deployment source", zap.Error(err)) + s.logger.Error("failed to get running deployment source", zap.String("stage-name", ps.Name), zap.Error(err)) return model.StageStatus_STAGE_FAILURE } tds, err := s.targetDSP.Get(ctx, io.Discard) if err != nil { - s.logger.Error("failed to get target deployment source", zap.Error(err)) + s.logger.Error("failed to get target deployment source", zap.String("stage-name", ps.Name), zap.Error(err)) return model.StageStatus_STAGE_FAILURE } @@ -508,8 +515,7 @@ func (s *scheduler) executeStage(sig StopSignal, ps *model.PipelineStage) (final // Find the executor plugin for this stage. plugin, ok := s.stageBasedPluginsMap[ps.Name] if !ok { - err := fmt.Errorf("no registered plugin that can perform for stage %s", ps.Name) - s.logger.Error(err.Error()) + s.logger.Error("failed to find the plugin for the stage", zap.String("stage-name", ps.Name)) s.reportStageStatus(ctx, ps.Id, model.StageStatus_STAGE_FAILURE, ps.Requires) return model.StageStatus_STAGE_FAILURE } @@ -517,7 +523,7 @@ func (s *scheduler) executeStage(sig StopSignal, ps *model.PipelineStage) (final // Load the stage configuration. stageConfig, stageConfigFound := s.genericApplicationConfig.GetStageByte(ps.Index) if !stageConfigFound { - s.logger.Error("Unable to find the stage configuration") + s.logger.Error("Unable to find the stage configuration", zap.String("stage-name", ps.Name)) if err := s.reportStageStatus(ctx, ps.Id, model.StageStatus_STAGE_FAILURE, ps.Requires); err != nil { s.logger.Error("failed to report stage status", zap.Error(err)) } @@ -535,7 +541,7 @@ func (s *scheduler) executeStage(sig StopSignal, ps *model.PipelineStage) (final }, }) if err != nil { - s.logger.Error("failed to execute stage", zap.Error(err)) + s.logger.Error("failed to execute stage", zap.String("stage-name", ps.Name), zap.Error(err)) s.reportStageStatus(ctx, ps.Id, model.StageStatus_STAGE_FAILURE, ps.Requires) return model.StageStatus_STAGE_FAILURE }