diff --git a/params/params.go b/params/params.go index e35cb298..2f1ea43e 100644 --- a/params/params.go +++ b/params/params.go @@ -150,8 +150,7 @@ type Repository struct { Pools []Pool `json:"pool,omitempty"` CredentialsName string `json:"credentials_name"` // Do not serialize sensitive info. - WebhookSecret string `json:"-"` - Internal Internal `json:"-"` + WebhookSecret string `json:"-"` } type Organization struct { @@ -160,8 +159,7 @@ type Organization struct { Pools []Pool `json:"pool,omitempty"` CredentialsName string `json:"credentials_name"` // Do not serialize sensitive info. - WebhookSecret string `json:"-"` - Internal Internal `json:"-"` + WebhookSecret string `json:"-"` } // Users holds information about a particular user @@ -200,5 +198,4 @@ type Provider struct { type UpdatePoolStateParams struct { WebhookSecret string - Internal Internal } diff --git a/params/requests.go b/params/requests.go index 1f51bfa4..f4d75781 100644 --- a/params/requests.go +++ b/params/requests.go @@ -57,7 +57,7 @@ type CreateOrgParams struct { func (c *CreateOrgParams) Validate() error { if c.Name == "" { - return errors.NewBadRequestError("missing repo name") + return errors.NewBadRequestError("missing org name") } if c.CredentialsName == "" { diff --git a/runner/interfaces.go b/runner/interfaces.go new file mode 100644 index 00000000..fa810e6b --- /dev/null +++ b/runner/interfaces.go @@ -0,0 +1,34 @@ +// Copyright 2022 Cloudbase Solutions SRL +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package runner + +import ( + "context" + dbCommon "garm/database/common" + "garm/params" + "garm/runner/common" +) + +type PoolManagerController interface { + CreateRepoPoolManager(ctx context.Context, repo params.Repository, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) + GetRepoPoolManager(repo params.Repository) (common.PoolManager, error) + DeleteRepoPoolManager(repo params.Repository) error + GetRepoPoolManagers() (map[string]common.PoolManager, error) + + CreateOrgPoolManager(ctx context.Context, org params.Organization, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) + GetOrgPoolManager(org params.Organization) (common.PoolManager, error) + DeleteOrgPoolManager(org params.Organization) error + GetOrgPoolManagers() (map[string]common.PoolManager, error) +} diff --git a/runner/organizations.go b/runner/organizations.go index b9be1573..e9f7e264 100644 --- a/runner/organizations.go +++ b/runner/organizations.go @@ -24,7 +24,6 @@ import ( runnerErrors "garm/errors" "garm/params" "garm/runner/common" - "garm/runner/pool" "github.com/pkg/errors" ) @@ -46,7 +45,7 @@ func (r *Runner) CreateOrganization(ctx context.Context, param params.CreateOrgP _, err = r.store.GetOrganization(ctx, param.Name) if err != nil { if !errors.Is(err, runnerErrors.ErrNotFound) { - return params.Organization{}, errors.Wrap(err, "fetching repo") + return params.Organization{}, errors.Wrap(err, "fetching org") } } else { return params.Organization{}, runnerErrors.NewConflictError("organization %s already exists", param.Name) @@ -63,11 +62,16 @@ func (r *Runner) CreateOrganization(ctx context.Context, param params.CreateOrgP } }() - poolMgr, err := r.loadOrgPoolManager(org) + poolMgr, err := r.poolManagerCtrl.CreateOrgPoolManager(r.ctx, org, r.providers, r.store) + if err != nil { + return params.Organization{}, errors.Wrap(err, "creating org pool manager") + } if err := poolMgr.Start(); err != nil { - return params.Organization{}, errors.Wrap(err, "starting pool manager") + if deleteErr := r.poolManagerCtrl.DeleteOrgPoolManager(org); deleteErr != nil { + log.Printf("failed to cleanup pool manager for org %s", org.ID) + } + return params.Organization{}, errors.Wrap(err, "starting org pool manager") } - r.organizations[org.ID] = poolMgr return org, nil } @@ -91,7 +95,7 @@ func (r *Runner) GetOrganizationByID(ctx context.Context, orgID string) (params. org, err := r.store.GetOrganizationByID(ctx, orgID) if err != nil { - return params.Organization{}, errors.Wrap(err, "fetching repository") + return params.Organization{}, errors.Wrap(err, "fetching organization") } return org, nil } @@ -103,20 +107,12 @@ func (r *Runner) DeleteOrganization(ctx context.Context, orgID string) error { org, err := r.store.GetOrganizationByID(ctx, orgID) if err != nil { - return errors.Wrap(err, "fetching repo") - } - - poolMgr, ok := r.organizations[org.ID] - if ok { - if err := poolMgr.Stop(); err != nil { - log.Printf("failed to stop pool for repo %s", org.ID) - } - delete(r.organizations, orgID) + return errors.Wrap(err, "fetching org") } pools, err := r.store.ListOrgPools(ctx, orgID) if err != nil { - return errors.Wrap(err, "fetching repo pools") + return errors.Wrap(err, "fetching org pools") } if len(pools) > 0 { @@ -125,7 +121,11 @@ func (r *Runner) DeleteOrganization(ctx context.Context, orgID string) error { poolIds = append(poolIds, pool.ID) } - return runnerErrors.NewBadRequestError("repo has pools defined (%s)", strings.Join(poolIds, ", ")) + return runnerErrors.NewBadRequestError("org has pools defined (%s)", strings.Join(poolIds, ", ")) + } + + if err := r.poolManagerCtrl.DeleteOrgPoolManager(org); err != nil { + return errors.Wrap(err, "deleting org pool manager") } if err := r.store.DeleteOrganization(ctx, orgID); err != nil { @@ -159,27 +159,19 @@ func (r *Runner) UpdateOrganization(ctx context.Context, orgID string, param par return params.Organization{}, errors.Wrap(err, "updating org") } - poolMgr, ok := r.organizations[org.ID] - if ok { - internalCfg, err := r.getInternalConfig(org.CredentialsName) - if err != nil { - return params.Organization{}, errors.Wrap(err, "fetching internal config") - } + poolMgr, err := r.poolManagerCtrl.GetOrgPoolManager(org) + if err != nil { newState := params.UpdatePoolStateParams{ WebhookSecret: org.WebhookSecret, - Internal: internalCfg, } - org.Internal = internalCfg // stop the pool mgr if err := poolMgr.RefreshState(newState); err != nil { - return params.Organization{}, errors.Wrap(err, "updating pool manager") + return params.Organization{}, errors.Wrap(err, "updating org pool manager") } } else { - poolMgr, err := r.loadOrgPoolManager(org) - if err != nil { - return params.Organization{}, errors.Wrap(err, "loading pool manager") + if _, err := r.poolManagerCtrl.CreateOrgPoolManager(r.ctx, org, r.providers, r.store); err != nil { + return params.Organization{}, errors.Wrap(err, "creating org pool manager") } - r.organizations[org.ID] = poolMgr } return org, nil @@ -193,8 +185,12 @@ func (r *Runner) CreateOrgPool(ctx context.Context, orgID string, param params.C r.mux.Lock() defer r.mux.Unlock() - _, ok := r.organizations[orgID] - if !ok { + org, err := r.store.GetOrganizationByID(ctx, orgID) + if err != nil { + return params.Pool{}, errors.Wrap(err, "fetching org") + } + + if _, err := r.poolManagerCtrl.GetOrgPoolManager(org); err != nil { return params.Pool{}, runnerErrors.ErrNotFound } @@ -313,30 +309,18 @@ func (r *Runner) ListOrgInstances(ctx context.Context, orgID string) ([]params.I return instances, nil } -func (r *Runner) loadOrgPoolManager(org params.Organization) (common.PoolManager, error) { - cfg, err := r.getInternalConfig(org.CredentialsName) - if err != nil { - return nil, errors.Wrap(err, "fetching internal config") - } - org.Internal = cfg - poolManager, err := pool.NewOrganizationPoolManager(r.ctx, org, r.providers, r.store) - if err != nil { - return nil, errors.Wrap(err, "creating pool manager") - } - return poolManager, nil -} - func (r *Runner) findOrgPoolManager(name string) (common.PoolManager, error) { r.mux.Lock() defer r.mux.Unlock() org, err := r.store.GetOrganization(r.ctx, name) if err != nil { - return nil, errors.Wrap(err, "fetching repo") + return nil, errors.Wrap(err, "fetching org") } - if orgPoolMgr, ok := r.organizations[org.ID]; ok { - return orgPoolMgr, nil + poolManager, err := r.poolManagerCtrl.GetOrgPoolManager(org) + if err != nil { + return nil, errors.Wrap(err, "fetching pool manager for org") } - return nil, errors.Wrapf(runnerErrors.ErrNotFound, "organization %s not configured", name) + return poolManager, nil } diff --git a/runner/pool/organization.go b/runner/pool/organization.go index 00fa1686..3f984e53 100644 --- a/runner/pool/organization.go +++ b/runner/pool/organization.go @@ -33,25 +33,26 @@ import ( // test that we implement PoolManager var _ poolHelper = &organization{} -func NewOrganizationPoolManager(ctx context.Context, cfg params.Organization, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) { - ghc, err := util.GithubClient(ctx, cfg.Internal.OAuth2Token) +func NewOrganizationPoolManager(ctx context.Context, cfg params.Organization, cfgInternal params.Internal, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) { + ghc, err := util.GithubClient(ctx, cfgInternal.OAuth2Token) if err != nil { return nil, errors.Wrap(err, "getting github client") } helper := &organization{ - cfg: cfg, - ctx: ctx, - ghcli: ghc, - id: cfg.ID, - store: store, + cfg: cfg, + cfgInternal: cfgInternal, + ctx: ctx, + ghcli: ghc, + id: cfg.ID, + store: store, } repo := &basePool{ ctx: ctx, store: store, providers: providers, - controllerID: cfg.Internal.ControllerID, + controllerID: cfgInternal.ControllerID, quit: make(chan struct{}), done: make(chan struct{}), helper: helper, @@ -60,11 +61,12 @@ func NewOrganizationPoolManager(ctx context.Context, cfg params.Organization, pr } type organization struct { - cfg params.Organization - ctx context.Context - ghcli common.GithubClient - id string - store dbCommon.Store + cfg params.Organization + cfgInternal params.Internal + ctx context.Context + ghcli common.GithubClient + id string + store dbCommon.Store mux sync.Mutex } @@ -74,7 +76,6 @@ func (r *organization) UpdateState(param params.UpdatePoolStateParams) error { defer r.mux.Unlock() r.cfg.WebhookSecret = param.WebhookSecret - r.cfg.Internal = param.Internal ghc, err := util.GithubClient(r.ctx, r.GetGithubToken()) if err != nil { @@ -85,7 +86,7 @@ func (r *organization) UpdateState(param params.UpdatePoolStateParams) error { } func (r *organization) GetGithubToken() string { - return r.cfg.Internal.OAuth2Token + return r.cfgInternal.OAuth2Token } func (r *organization) GetGithubRunners() ([]*github.Runner, error) { @@ -129,7 +130,7 @@ func (r *organization) GithubURL() string { } func (r *organization) JwtToken() string { - return r.cfg.Internal.JWTSecret + return r.cfgInternal.JWTSecret } func (r *organization) GetGithubRegistrationToken() (string, error) { @@ -150,7 +151,7 @@ func (r *organization) WebhookSecret() string { } func (r *organization) GetCallbackURL() string { - return r.cfg.Internal.InstanceCallbackURL + return r.cfgInternal.InstanceCallbackURL } func (r *organization) FindPoolByTags(labels []string) (params.Pool, error) { diff --git a/runner/pool/repository.go b/runner/pool/repository.go index 1c2d80e2..b17216f4 100644 --- a/runner/pool/repository.go +++ b/runner/pool/repository.go @@ -33,25 +33,26 @@ import ( // test that we implement PoolManager var _ poolHelper = &repository{} -func NewRepositoryPoolManager(ctx context.Context, cfg params.Repository, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) { - ghc, err := util.GithubClient(ctx, cfg.Internal.OAuth2Token) +func NewRepositoryPoolManager(ctx context.Context, cfg params.Repository, cfgInternal params.Internal, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) { + ghc, err := util.GithubClient(ctx, cfgInternal.OAuth2Token) if err != nil { return nil, errors.Wrap(err, "getting github client") } helper := &repository{ - cfg: cfg, - ctx: ctx, - ghcli: ghc, - id: cfg.ID, - store: store, + cfg: cfg, + cfgInternal: cfgInternal, + ctx: ctx, + ghcli: ghc, + id: cfg.ID, + store: store, } repo := &basePool{ ctx: ctx, store: store, providers: providers, - controllerID: cfg.Internal.ControllerID, + controllerID: cfgInternal.ControllerID, quit: make(chan struct{}), done: make(chan struct{}), helper: helper, @@ -62,11 +63,12 @@ func NewRepositoryPoolManager(ctx context.Context, cfg params.Repository, provid var _ poolHelper = &repository{} type repository struct { - cfg params.Repository - ctx context.Context - ghcli common.GithubClient - id string - store dbCommon.Store + cfg params.Repository + cfgInternal params.Internal + ctx context.Context + ghcli common.GithubClient + id string + store dbCommon.Store mux sync.Mutex } @@ -76,7 +78,6 @@ func (r *repository) UpdateState(param params.UpdatePoolStateParams) error { defer r.mux.Unlock() r.cfg.WebhookSecret = param.WebhookSecret - r.cfg.Internal = param.Internal ghc, err := util.GithubClient(r.ctx, r.GetGithubToken()) if err != nil { @@ -87,7 +88,7 @@ func (r *repository) UpdateState(param params.UpdatePoolStateParams) error { } func (r *repository) GetGithubToken() string { - return r.cfg.Internal.OAuth2Token + return r.cfgInternal.OAuth2Token } func (r *repository) GetGithubRunners() ([]*github.Runner, error) { @@ -131,7 +132,7 @@ func (r *repository) GithubURL() string { } func (r *repository) JwtToken() string { - return r.cfg.Internal.JWTSecret + return r.cfgInternal.JWTSecret } func (r *repository) GetGithubRegistrationToken() (string, error) { @@ -152,7 +153,7 @@ func (r *repository) WebhookSecret() string { } func (r *repository) GetCallbackURL() string { - return r.cfg.Internal.InstanceCallbackURL + return r.cfgInternal.InstanceCallbackURL } func (r *repository) FindPoolByTags(labels []string) (params.Pool, error) { diff --git a/runner/repositories.go b/runner/repositories.go index f8634439..3cc11aad 100644 --- a/runner/repositories.go +++ b/runner/repositories.go @@ -24,7 +24,6 @@ import ( runnerErrors "garm/errors" "garm/params" "garm/runner/common" - "garm/runner/pool" "github.com/pkg/errors" ) @@ -63,11 +62,16 @@ func (r *Runner) CreateRepository(ctx context.Context, param params.CreateRepoPa } }() - poolMgr, err := r.loadRepoPoolManager(repo) + poolMgr, err := r.poolManagerCtrl.CreateRepoPoolManager(r.ctx, repo, r.providers, r.store) + if err != nil { + return params.Repository{}, errors.Wrap(err, "creating repo pool manager") + } if err := poolMgr.Start(); err != nil { - return params.Repository{}, errors.Wrap(err, "starting pool manager") + if deleteErr := r.poolManagerCtrl.DeleteRepoPoolManager(repo); deleteErr != nil { + log.Printf("failed to cleanup pool manager for repo %s", repo.ID) + } + return params.Repository{}, errors.Wrap(err, "starting repo pool manager") } - r.repositories[repo.ID] = poolMgr return repo, nil } @@ -106,14 +110,6 @@ func (r *Runner) DeleteRepository(ctx context.Context, repoID string) error { return errors.Wrap(err, "fetching repo") } - poolMgr, ok := r.repositories[repo.ID] - if ok { - if err := poolMgr.Stop(); err != nil { - log.Printf("failed to stop pool for repo %s", repo.ID) - } - delete(r.repositories, repoID) - } - pools, err := r.store.ListRepoPools(ctx, repoID) if err != nil { return errors.Wrap(err, "fetching repo pools") @@ -128,6 +124,10 @@ func (r *Runner) DeleteRepository(ctx context.Context, repoID string) error { return runnerErrors.NewBadRequestError("repo has pools defined (%s)", strings.Join(poolIds, ", ")) } + if err := r.poolManagerCtrl.DeleteRepoPoolManager(repo); err != nil { + return errors.Wrap(err, "deleting repo pool manager") + } + if err := r.store.DeleteRepository(ctx, repoID); err != nil { return errors.Wrap(err, "removing repository") } @@ -159,27 +159,19 @@ func (r *Runner) UpdateRepository(ctx context.Context, repoID string, param para return params.Repository{}, errors.Wrap(err, "updating repo") } - poolMgr, ok := r.repositories[repo.ID] - if ok { - internalCfg, err := r.getInternalConfig(repo.CredentialsName) - if err != nil { - return params.Repository{}, errors.Wrap(err, "fetching internal config") - } + poolMgr, err := r.poolManagerCtrl.GetRepoPoolManager(repo) + if err != nil { newState := params.UpdatePoolStateParams{ WebhookSecret: repo.WebhookSecret, - Internal: internalCfg, } - repo.Internal = internalCfg // stop the pool mgr if err := poolMgr.RefreshState(newState); err != nil { - return params.Repository{}, errors.Wrap(err, "updating pool manager") + return params.Repository{}, errors.Wrap(err, "updating repo pool manager") } } else { - poolMgr, err := r.loadRepoPoolManager(repo) - if err != nil { - return params.Repository{}, errors.Wrap(err, "loading pool manager") + if _, err := r.poolManagerCtrl.CreateRepoPoolManager(r.ctx, repo, r.providers, r.store); err != nil { + return params.Repository{}, errors.Wrap(err, "creating repo pool manager") } - r.repositories[repo.ID] = poolMgr } return repo, nil @@ -193,8 +185,12 @@ func (r *Runner) CreateRepoPool(ctx context.Context, repoID string, param params r.mux.Lock() defer r.mux.Unlock() - _, ok := r.repositories[repoID] - if !ok { + repo, err := r.store.GetRepositoryByID(ctx, repoID) + if err != nil { + return params.Pool{}, errors.Wrap(err, "fetching repo") + } + + if _, err := r.poolManagerCtrl.GetRepoPoolManager(repo); err != nil { return params.Pool{}, runnerErrors.ErrNotFound } @@ -324,19 +320,6 @@ func (r *Runner) ListRepoInstances(ctx context.Context, repoID string) ([]params return instances, nil } -func (r *Runner) loadRepoPoolManager(repo params.Repository) (common.PoolManager, error) { - cfg, err := r.getInternalConfig(repo.CredentialsName) - if err != nil { - return nil, errors.Wrap(err, "fetching internal config") - } - repo.Internal = cfg - poolManager, err := pool.NewRepositoryPoolManager(r.ctx, repo, r.providers, r.store) - if err != nil { - return nil, errors.Wrap(err, "creating pool manager") - } - return poolManager, nil -} - func (r *Runner) findRepoPoolManager(owner, name string) (common.PoolManager, error) { r.mux.Lock() defer r.mux.Unlock() @@ -346,8 +329,9 @@ func (r *Runner) findRepoPoolManager(owner, name string) (common.PoolManager, er return nil, errors.Wrap(err, "fetching repo") } - if repo, ok := r.repositories[repo.ID]; ok { - return repo, nil + poolManager, err := r.poolManagerCtrl.GetRepoPoolManager(repo) + if err != nil { + return nil, errors.Wrap(err, "fetching pool manager for repo") } - return nil, errors.Wrapf(runnerErrors.ErrNotFound, "repository %s/%s not configured", owner, name) + return poolManager, nil } diff --git a/runner/runner.go b/runner/runner.go index 48c3309e..8b681aa4 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -37,6 +37,7 @@ import ( runnerErrors "garm/errors" "garm/params" "garm/runner/common" + "garm/runner/pool" "garm/runner/providers" providerCommon "garm/runner/providers/common" "garm/util" @@ -66,15 +67,21 @@ func NewRunner(ctx context.Context, cfg config.Config) (*Runner, error) { for _, ghcreds := range cfg.Github { creds[ghcreds.Name] = ghcreds } - runner := &Runner{ - ctx: ctx, + + poolManagerCtrl := &poolManagerCtrl{ + controllerID: ctrlId.ControllerID.String(), config: cfg, - store: db, + credentials: creds, repositories: map[string]common.PoolManager{}, organizations: map[string]common.PoolManager{}, - providers: providers, - controllerID: ctrlId.ControllerID.String(), - credentials: creds, + } + runner := &Runner{ + ctx: ctx, + config: cfg, + store: db, + poolManagerCtrl: poolManagerCtrl, + providers: providers, + credentials: creds, } if err := runner.loadReposAndOrgs(); err != nil { @@ -84,18 +91,124 @@ func NewRunner(ctx context.Context, cfg config.Config) (*Runner, error) { return runner, nil } -type Runner struct { +type poolManagerCtrl struct { mux sync.Mutex - config config.Config controllerID string - ctx context.Context - store dbCommon.Store + config config.Config + credentials map[string]config.Github repositories map[string]common.PoolManager organizations map[string]common.PoolManager - providers map[string]common.Provider - credentials map[string]config.Github +} + +func (p *poolManagerCtrl) CreateRepoPoolManager(ctx context.Context, repo params.Repository, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) { + p.mux.Lock() + defer p.mux.Unlock() + + cfgInternal, err := p.getInternalConfig(repo.CredentialsName) + if err != nil { + return nil, errors.Wrap(err, "fetching internal config") + } + poolManager, err := pool.NewRepositoryPoolManager(ctx, repo, cfgInternal, providers, store) + if err != nil { + return nil, errors.Wrap(err, "creating repo pool manager") + } + p.repositories[repo.ID] = poolManager + return poolManager, nil +} + +func (p *poolManagerCtrl) GetRepoPoolManager(repo params.Repository) (common.PoolManager, error) { + if repoPoolMgr, ok := p.repositories[repo.ID]; ok { + return repoPoolMgr, nil + } + return nil, errors.Wrapf(runnerErrors.ErrNotFound, "repository %s/%s pool manager not loaded", repo.Owner, repo.Name) +} + +func (p *poolManagerCtrl) DeleteRepoPoolManager(repo params.Repository) error { + p.mux.Lock() + defer p.mux.Unlock() + + poolMgr, ok := p.repositories[repo.ID] + if ok { + if err := poolMgr.Stop(); err != nil { + return errors.Wrap(err, "stopping repo pool manager") + } + delete(p.repositories, repo.ID) + } + return nil +} + +func (p *poolManagerCtrl) GetRepoPoolManagers() (map[string]common.PoolManager, error) { + return p.repositories, nil +} + +func (p *poolManagerCtrl) CreateOrgPoolManager(ctx context.Context, org params.Organization, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) { + p.mux.Lock() + defer p.mux.Unlock() + + cfgInternal, err := p.getInternalConfig(org.CredentialsName) + if err != nil { + return nil, errors.Wrap(err, "fetching internal config") + } + poolManager, err := pool.NewOrganizationPoolManager(ctx, org, cfgInternal, providers, store) + if err != nil { + return nil, errors.Wrap(err, "creating org pool manager") + } + p.organizations[org.ID] = poolManager + return poolManager, nil +} + +func (p *poolManagerCtrl) GetOrgPoolManager(org params.Organization) (common.PoolManager, error) { + if orgPoolMgr, ok := p.organizations[org.ID]; ok { + return orgPoolMgr, nil + } + return nil, errors.Wrapf(runnerErrors.ErrNotFound, "organization %s pool manager not loaded", org.Name) +} + +func (p *poolManagerCtrl) DeleteOrgPoolManager(org params.Organization) error { + p.mux.Lock() + defer p.mux.Unlock() + + poolMgr, ok := p.organizations[org.ID] + if ok { + if err := poolMgr.Stop(); err != nil { + return errors.Wrap(err, "stopping org pool manager") + } + delete(p.organizations, org.ID) + } + return nil +} + +func (p *poolManagerCtrl) GetOrgPoolManagers() (map[string]common.PoolManager, error) { + return p.organizations, nil +} + +func (p *poolManagerCtrl) getInternalConfig(credsName string) (params.Internal, error) { + creds, ok := p.credentials[credsName] + if !ok { + return params.Internal{}, runnerErrors.NewBadRequestError("invalid credential name (%s)", credsName) + } + + return params.Internal{ + OAuth2Token: creds.OAuth2Token, + ControllerID: p.controllerID, + InstanceCallbackURL: p.config.Default.CallbackURL, + JWTSecret: p.config.JWTAuth.Secret, + }, nil +} + +type Runner struct { + mux sync.Mutex + + config config.Config + ctx context.Context + store dbCommon.Store + + poolManagerCtrl PoolManagerController + + providers map[string]common.Provider + credentials map[string]config.Github } func (r *Runner) ListCredentials(ctx context.Context) ([]params.GithubCredentials, error) { @@ -119,20 +232,6 @@ func (r *Runner) ListProviders(ctx context.Context) ([]params.Provider, error) { return ret, nil } -func (r *Runner) getInternalConfig(credsName string) (params.Internal, error) { - creds, ok := r.credentials[credsName] - if !ok { - return params.Internal{}, runnerErrors.NewBadRequestError("invalid credential name (%s)", credsName) - } - - return params.Internal{ - OAuth2Token: creds.OAuth2Token, - ControllerID: r.controllerID, - InstanceCallbackURL: r.config.Default.CallbackURL, - JWTSecret: r.config.JWTAuth.Secret, - }, nil -} - func (r *Runner) loadReposAndOrgs() error { r.mux.Lock() defer r.mux.Unlock() @@ -144,48 +243,36 @@ func (r *Runner) loadReposAndOrgs() error { orgs, err := r.store.ListOrganizations(r.ctx) if err != nil { - return errors.Wrap(err, "fetching repositories") + return errors.Wrap(err, "fetching organizations") } expectedReplies := len(repos) + len(orgs) - repoPoolMgrChan := make(chan common.PoolManager, len(repos)) - orgPoolMgrChan := make(chan common.PoolManager, len(orgs)) errChan := make(chan error, expectedReplies) for _, repo := range repos { go func(repo params.Repository) { - log.Printf("creating pool manager for %s/%s", repo.Owner, repo.Name) - poolManager, err := r.loadRepoPoolManager(repo) - if err != nil { - errChan <- err - return - } - repoPoolMgrChan <- poolManager + log.Printf("creating pool manager for repo %s/%s", repo.Owner, repo.Name) + _, err := r.poolManagerCtrl.CreateRepoPoolManager(r.ctx, repo, r.providers, r.store) + errChan <- err }(repo) } for _, org := range orgs { go func(org params.Organization) { log.Printf("creating pool manager for organization %s", org.Name) - poolManager, err := r.loadOrgPoolManager(org) - if err != nil { - errChan <- err - return - } - orgPoolMgrChan <- poolManager + _, err := r.poolManagerCtrl.CreateOrgPoolManager(r.ctx, org, r.providers, r.store) + errChan <- err }(org) } for i := 0; i < expectedReplies; i++ { select { - case repoPool := <-repoPoolMgrChan: - r.repositories[repoPool.ID()] = repoPool - case orgPool := <-orgPoolMgrChan: - r.organizations[orgPool.ID()] = orgPool case err := <-errChan: - return errors.Wrap(err, "failed to load repos and pools") + if err != nil { + return errors.Wrap(err, "failed to load pool managers for repos and orgs") + } case <-time.After(60 * time.Second): - return fmt.Errorf("timed out waiting for pool mamager load") + return fmt.Errorf("timed out waiting for pool manager load") } } @@ -196,10 +283,20 @@ func (r *Runner) Start() error { r.mux.Lock() defer r.mux.Unlock() - expectedReplies := len(r.repositories) + len(r.organizations) + repositories, err := r.poolManagerCtrl.GetRepoPoolManagers() + if err != nil { + return errors.Wrap(err, "fetch repo pool managers") + } + + organizations, err := r.poolManagerCtrl.GetOrgPoolManagers() + if err != nil { + return errors.Wrap(err, "fetch org pool managers") + } + + expectedReplies := len(repositories) + len(organizations) errChan := make(chan error, expectedReplies) - for _, repo := range r.repositories { + for _, repo := range repositories { go func(repo common.PoolManager) { err := repo.Start() errChan <- err @@ -207,7 +304,7 @@ func (r *Runner) Start() error { }(repo) } - for _, org := range r.organizations { + for _, org := range organizations { go func(org common.PoolManager) { err := org.Start() errChan <- err @@ -232,13 +329,21 @@ func (r *Runner) Stop() error { r.mux.Lock() defer r.mux.Unlock() - for _, repo := range r.repositories { + repos, err := r.poolManagerCtrl.GetRepoPoolManagers() + if err != nil { + return errors.Wrap(err, "fetch repo pool managers") + } + for _, repo := range repos { if err := repo.Stop(); err != nil { return errors.Wrap(err, "stopping repo pool manager") } } - for _, org := range r.organizations { + orgs, err := r.poolManagerCtrl.GetOrgPoolManagers() + if err != nil { + return errors.Wrap(err, "fetch org pool managers") + } + for _, org := range orgs { if err := org.Stop(); err != nil { return errors.Wrap(err, "stopping org pool manager") } @@ -252,7 +357,11 @@ func (r *Runner) Wait() error { var wg sync.WaitGroup - for poolId, repo := range r.repositories { + repos, err := r.poolManagerCtrl.GetRepoPoolManagers() + if err != nil { + return errors.Wrap(err, "fetch repo pool managers") + } + for poolId, repo := range repos { wg.Add(1) go func(id string, poolMgr common.PoolManager) { defer wg.Done() @@ -262,7 +371,11 @@ func (r *Runner) Wait() error { }(poolId, repo) } - for poolId, org := range r.organizations { + orgs, err := r.poolManagerCtrl.GetOrgPoolManagers() + if err != nil { + return errors.Wrap(err, "fetch org pool managers") + } + for poolId, org := range orgs { wg.Add(1) go func(id string, poolMgr common.PoolManager) { defer wg.Done() @@ -320,7 +433,7 @@ func (r *Runner) validateHookBody(signature, secret string, body []byte) error { } func (r *Runner) DispatchWorkflowJob(hookTargetType, signature string, jobData []byte) error { - if jobData == nil || len(jobData) == 0 { + if len(jobData) == 0 { return runnerErrors.NewBadRequestError("missing job data") } @@ -480,7 +593,7 @@ func (r *Runner) ListAllInstances(ctx context.Context) ([]params.Instance, error instances, err := r.store.ListAllInstances(ctx) if err != nil { - return nil, errors.Wrap(err, "fetcing instances") + return nil, errors.Wrap(err, "fetching instances") } return instances, nil }