Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC pipedv1 starts plugins #5394

Merged
merged 1 commit into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 86 additions & 7 deletions pkg/app/pipedv1/cmd/piped/piped.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@
"context"
"crypto/tls"
"encoding/base64"
"encoding/json"
"fmt"
"net"
"net/http"
"net/http/pprof"
"os"
"os/exec"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"time"

secretmanager "cloud.google.com/go/secretmanager/apiv1"
Expand Down Expand Up @@ -64,7 +68,9 @@
config "github.com/pipe-cd/pipecd/pkg/configv1"
"github.com/pipe-cd/pipecd/pkg/crypto"
"github.com/pipe-cd/pipecd/pkg/git"
"github.com/pipe-cd/pipecd/pkg/lifecycle"
"github.com/pipe-cd/pipecd/pkg/model"
pluginapi "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1"
"github.com/pipe-cd/pipecd/pkg/rpc"
"github.com/pipe-cd/pipecd/pkg/rpc/rpcauth"
"github.com/pipe-cd/pipecd/pkg/rpc/rpcclient"
Expand All @@ -86,6 +92,7 @@
adminPort int
pluginServicePort int
toolsDir string
pluginsDir string
enableDefaultKubernetesCloudProvider bool
gracePeriod time.Duration
addLoginUserToPasswd bool
Expand All @@ -102,6 +109,7 @@
adminPort: 9085,
pluginServicePort: 9087,
toolsDir: path.Join(home, ".piped", "tools"),
pluginsDir: path.Join(home, ".piped", "plugins"),

Check warning on line 112 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L112

Added line #L112 was not covered by tests
gracePeriod: 30 * time.Second,
maxRecvMsgSize: 1024 * 1024 * 10, // 10MB
}
Expand Down Expand Up @@ -168,19 +176,20 @@
return err
}

// Make gRPC client and connect to the API.
// Make gRPC client and connect to the Control Plane API.
apiClient, err := p.createAPIClient(ctx, cfg.APIAddress, cfg.ProjectID, cfg.PipedID, pipedKey, input.Logger)
if err != nil {
input.Logger.Error("failed to create gRPC client to control plane", zap.Error(err))
return err
}

// Setup the tracer provider.
// We don't set the global tracer provider because 3rd-party library may use the global one.
tracerProvider, err := p.createTracerProvider(ctx, cfg.APIAddress, cfg.ProjectID, cfg.PipedID, pipedKey)
if err != nil {
input.Logger.Error("failed to create tracer provider", zap.Error(err))
return err
}
// we don't set the global tracer provider because 3rd-party library may use the global one.

// Send the newest piped meta to the control-plane.
if err := p.sendPipedMeta(ctx, apiClient, cfg, input.Logger); err != nil {
Expand Down Expand Up @@ -289,11 +298,6 @@
eventLister = store.Lister()
}

// Start running application live state reporter.
{
// TODO: Implement the live state reporter controller.
}

// Start running plugin service server.
{
var (
Expand All @@ -317,6 +321,51 @@
})
}

// Start plugins that registered in the configuration.
{
// Start all plugins and keep their commands to stop them later.
plugins, err := p.runPlugins(ctx, cfg.Plugins, input.Logger)
if err != nil {
input.Logger.Error("failed to run plugins", zap.Error(err))
return err
}

Check warning on line 331 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L325-L331

Added lines #L325 - L331 were not covered by tests

group.Go(func() error {
<-ctx.Done()
wg := &sync.WaitGroup{}
for _, plg := range plugins {
wg.Add(1)
go func() {
defer wg.Done()
if err := plg.GracefulStop(p.gracePeriod); err != nil {
input.Logger.Error("failed to stop plugin", zap.Error(err))
}

Check warning on line 342 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L333-L342

Added lines #L333 - L342 were not covered by tests
}()
}
wg.Wait()
return nil

Check warning on line 346 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L345-L346

Added lines #L345 - L346 were not covered by tests
})
}

// Make grpc clients to connect to plugins.
pluginClis := make([]pluginapi.PluginClient, 0, len(cfg.Plugins))
options := []rpcclient.DialOption{
rpcclient.WithBlock(),
rpcclient.WithInsecure(),
}
for _, plg := range cfg.Plugins {
cli, err := pluginapi.NewClient(ctx, net.JoinHostPort("localhost", strconv.Itoa(plg.Port)), options...)
if err != nil {
input.Logger.Error("failed to create client to connect plugin", zap.String("plugin", plg.Name), zap.Error(err))
}
pluginClis = append(pluginClis, cli)

Check warning on line 361 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L351-L361

Added lines #L351 - L361 were not covered by tests
}

// Start running application live state reporter.
{
// TODO: Implement the live state reporter controller.
}

Check warning on line 367 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L365-L367

Added lines #L365 - L367 were not covered by tests

// Start running application application drift detector.
{
// TODO: Implement the drift detector controller.
Expand All @@ -327,6 +376,7 @@
c := controller.NewController(
apiClient,
gitClient,
pluginClis,

Check warning on line 379 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L379

Added line #L379 was not covered by tests
deploymentLister,
commandLister,
notifier,
Expand Down Expand Up @@ -588,6 +638,35 @@
return nil, fmt.Errorf("one of config-file, config-gcp-secret or config-aws-secret must be set")
}

func (p *piped) runPlugins(ctx context.Context, pluginsCfg []config.PipedPlugin, logger *zap.Logger) ([]*lifecycle.Command, error) {
plugins := make([]*lifecycle.Command, 0, len(pluginsCfg))
for _, pCfg := range pluginsCfg {
// Download plugin binary to piped's pluginsDir.
pPath, err := lifecycle.DownloadBinary(pCfg.URL, p.pluginsDir, pCfg.Name, logger)
if err != nil {
return nil, fmt.Errorf("failed to download plugin %s: %w", pCfg.Name, err)
}

Check warning on line 648 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L641-L648

Added lines #L641 - L648 were not covered by tests

// Build plugin's args.
args := make([]string, 0, 0)
args = append(args, "--piped-plugin-service", net.JoinHostPort("localhost", strconv.Itoa(p.pluginServicePort)))
b, err := json.Marshal(pCfg)
if err != nil {
return nil, fmt.Errorf("failed to prepare plugin %s config: %w", pCfg.Name, err)
}
args = append(args, "--config", string(b))

// Run the plugin binary.
cmd, err := lifecycle.RunBinary(ctx, pPath, args)
if err != nil {
return nil, fmt.Errorf("failed to run plugin %s: %w", pCfg.Name, err)
}

Check warning on line 663 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L651-L663

Added lines #L651 - L663 were not covered by tests

plugins = append(plugins, cmd)

Check warning on line 665 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L665

Added line #L665 was not covered by tests
}
return plugins, nil

Check warning on line 667 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L667

Added line #L667 was not covered by tests
}

// TODO: Remove this once the decryption task by plugin call to the plugin service is implemented.
func (p *piped) initializeSecretDecrypter(cfg *config.PipedSpec) (crypto.Decrypter, error) {
sm := cfg.SecretManagement
Expand Down
37 changes: 28 additions & 9 deletions pkg/app/pipedv1/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
"github.com/pipe-cd/pipecd/pkg/git"
"github.com/pipe-cd/pipecd/pkg/model"
pluginapi "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1"
"github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment"
)

type apiClient interface {
Expand Down Expand Up @@ -84,12 +86,15 @@

type controller struct {
apiClient apiClient
pluginRegistry PluginRegistry
gitClient gitClient
deploymentLister deploymentLister
commandLister commandLister
notifier notifier

// gRPC clients to communicate with plugins.
pluginClients []pluginapi.PluginClient
// Map from stage name to the plugin client.
stageBasedPluginsMap map[string]pluginapi.PluginClient
// Map from application ID to the planner
// of a pending deployment of that application.
planners map[string]*planner
Expand Down Expand Up @@ -121,6 +126,7 @@
func NewController(
apiClient apiClient,
gitClient gitClient,
pluginClients []pluginapi.PluginClient,
deploymentLister deploymentLister,
commandLister commandLister,
notifier notifier,
Expand All @@ -131,8 +137,8 @@

return &controller{
apiClient: apiClient,
pluginRegistry: DefaultPluginRegistry(),
gitClient: gitClient,
pluginClients: pluginClients,

Check warning on line 141 in pkg/app/pipedv1/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/controller/controller.go#L141

Added line #L141 was not covered by tests
deploymentLister: deploymentLister,
commandLister: commandLister,
notifier: notifier,
Expand Down Expand Up @@ -166,6 +172,23 @@
c.workspaceDir = dir
c.logger.Info(fmt.Sprintf("workspace directory was configured to %s", c.workspaceDir))

// Build the list of stages that can be handled by piped's plugins.
stagesBasedPluginsMap := make(map[string]pluginapi.PluginClient)
for _, plugin := range c.pluginClients {
resp, err := plugin.FetchDefinedStages(ctx, &deployment.FetchDefinedStagesRequest{})
if err != nil {
return err
}
for _, stage := range resp.GetStages() {
if _, ok := stagesBasedPluginsMap[stage]; ok {
c.logger.Error("duplicated stage name", zap.String("stage", stage))
return fmt.Errorf("duplicated stage name %s", stage)
}
stagesBasedPluginsMap[stage] = plugin

Check warning on line 187 in pkg/app/pipedv1/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/controller/controller.go#L175-L187

Added lines #L175 - L187 were not covered by tests
}
}
c.stageBasedPluginsMap = stagesBasedPluginsMap

Check warning on line 191 in pkg/app/pipedv1/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/controller/controller.go#L190-L191

Added lines #L190 - L191 were not covered by tests
ticker := time.NewTicker(c.syncInternal)
defer ticker.Stop()
c.logger.Info("start syncing planners and schedulers")
Expand Down Expand Up @@ -410,18 +433,13 @@
}
}

pluginClient, ok := c.pluginRegistry.Plugin(d.Kind)
if !ok {
logger.Error("no plugin client for the application kind", zap.String("kind", d.Kind.String()))
return nil, fmt.Errorf("no plugin client for the application kind %s", d.Kind.String())
}

planner := newPlanner(
d,
commitHash,
configFilename,
workingDir,
pluginClient,
c.pluginClients, // FIXME: Find a way to ensure the plugins only related to deployment.
c.stageBasedPluginsMap,

Check warning on line 442 in pkg/app/pipedv1/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/controller/controller.go#L441-L442

Added lines #L441 - L442 were not covered by tests
c.apiClient,
c.gitClient,
c.notifier,
Expand Down Expand Up @@ -561,6 +579,7 @@
workingDir,
c.apiClient,
c.gitClient,
c.stageBasedPluginsMap,

Check warning on line 582 in pkg/app/pipedv1/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/controller/controller.go#L582

Added line #L582 was not covered by tests
c.notifier,
c.logger,
c.tracerProvider,
Expand Down
16 changes: 4 additions & 12 deletions pkg/app/pipedv1/controller/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@
lastSuccessfulCommitHash string,
lastSuccessfulConfigFilename string,
workingDir string,
pluginClient pluginapi.PluginClient,
pluginClients []pluginapi.PluginClient,
stageBasedPluginsMap map[string]pluginapi.PluginClient,
apiClient apiClient,
gitClient gitClient,
notifier notifier,
Expand All @@ -106,22 +107,13 @@
zap.String("working-dir", workingDir),
)

// TODO: Fix this. Passed by args
tmp := make(map[string]pluginapi.PluginClient)
tmp["K8S_SYNC"] = pluginClient

plugins := make([]pluginapi.PluginClient, 0, len(tmp))
for _, v := range tmp {
plugins = append(plugins, v)
}

p := &planner{
deployment: d,
lastSuccessfulCommitHash: lastSuccessfulCommitHash,
lastSuccessfulConfigFilename: lastSuccessfulConfigFilename,
workingDir: workingDir,
stageBasedPluginsMap: tmp,
plugins: plugins,
stageBasedPluginsMap: stageBasedPluginsMap,
plugins: pluginClients,

Check warning on line 116 in pkg/app/pipedv1/controller/planner.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/controller/planner.go#L115-L116

Added lines #L115 - L116 were not covered by tests
apiClient: apiClient,
gitClient: gitClient,
metadataStore: metadatastore.NewMetadataStore(apiClient, d),
Expand Down
78 changes: 0 additions & 78 deletions pkg/app/pipedv1/controller/pluginregistry.go

This file was deleted.

Loading
Loading