Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] feat(concurrent): enabling concurrent reconciles for the applications… #11881

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions applicationset/controllers/applicationset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"
Expand Down Expand Up @@ -72,6 +73,7 @@ type ApplicationSetReconciler struct {
utils.Renderer

EnableProgressiveRollouts bool
MaxConcurrentReconciles int
}

// +kubebuilder:rbac:groups=argoproj.io,resources=applicationsets,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -540,6 +542,9 @@ func (r *ApplicationSetReconciler) SetupWithManager(mgr ctrl.Manager) error {
Client: mgr.GetClient(),
Log: log.WithField("type", "createSecretEventHandler"),
}).
WithOptions(controller.Options{
MaxConcurrentReconciles: r.MaxConcurrentReconciles,
}).
// TODO: also watch Applications and respond on changes if we own them.
Complete(r)
}
Expand Down
8 changes: 7 additions & 1 deletion applicationset/services/repo_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"path/filepath"
"strings"

"github.com/argoproj/argo-cd/v2/applicationset/utils"

"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/v2/util/db"
"github.com/argoproj/argo-cd/v2/util/git"
Expand All @@ -22,6 +24,7 @@ type argoCDService struct {
repositoriesDB RepositoryDB
storecreds git.CredsStore
submoduleEnabled bool
keyLock *utils.KeyLock
}

type Repos interface {
Expand All @@ -33,12 +36,13 @@ type Repos interface {
GetDirectories(ctx context.Context, repoURL string, revision string) ([]string, error)
}

func NewArgoCDService(db db.ArgoDB, gitCredStore git.CredsStore, submoduleEnabled bool) Repos {
func NewArgoCDService(db db.ArgoDB, gitCredStore git.CredsStore, submoduleEnabled bool, keyLock *utils.KeyLock) Repos {

return &argoCDService{
repositoriesDB: db.(RepositoryDB),
storecreds: gitCredStore,
submoduleEnabled: submoduleEnabled,
keyLock: keyLock,
}
}

Expand All @@ -54,6 +58,7 @@ func (a *argoCDService) GetFiles(ctx context.Context, repoURL string, revision s
return nil, err
}

defer a.keyLock.Lock(repoURL)()
Copy link
Member Author

@rumstead rumstead Jan 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Locking on the repo URL doesnt help too much if you have a monorepo where the config files for your app live. It would make more sense to checkout and lock based on something like <app set name>/<repo URL>/

err = checkoutRepo(gitRepoClient, revision, a.submoduleEnabled)
if err != nil {
return nil, err
Expand Down Expand Up @@ -88,6 +93,7 @@ func (a *argoCDService) GetDirectories(ctx context.Context, repoURL string, revi
return nil, fmt.Errorf("error creating a new git client: %w", err)
}

defer a.keyLock.Lock(repoURL)()
err = checkoutRepo(gitRepoClient, revision, a.submoduleEnabled)
if err != nil {
return nil, fmt.Errorf("error while checking out repo: %w", err)
Expand Down
139 changes: 137 additions & 2 deletions applicationset/services/repo_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@ import (
"context"
"fmt"
"sort"
"sync"
"testing"

"github.com/argoproj/argo-cd/v2/applicationset/utils"
"github.com/argoproj/argo-cd/v2/util/git"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
)

const argocdExampleAppCommitID = "08f72e2a309beab929d9fd14626071b1a61a47f9"

type ArgocdRepositoryMock struct {
mock *mock.Mock
}
Expand All @@ -29,7 +34,7 @@ func TestGetDirectories(t *testing.T) {
// Author: Alexander Matyushentsev <Alexander_Matyushentsev@intuit.com>
// Date: Sun Jan 31 09:54:53 2021 -0800
// chore: downgrade kustomize guestbook image tag (#73)
exampleRepoRevision := "08f72e2a309beab929d9fd14626071b1a61a47f9"
exampleRepoRevision := argocdExampleAppCommitID

for _, c := range []struct {
name string
Expand Down Expand Up @@ -85,6 +90,7 @@ func TestGetDirectories(t *testing.T) {

argocd := argoCDService{
repositoriesDB: argocdRepositoryMock,
keyLock: utils.NewKeyLock(),
}

got, err := argocd.GetDirectories(context.TODO(), cc.repoURL, cc.revision)
Expand All @@ -106,7 +112,7 @@ func TestGetFiles(t *testing.T) {

// Hardcode a specific commit, so that changes to argoproj/argocd-example-apps/ don't break our tests
// "chore: downgrade kustomize guestbook image tag (#73)"
commitID := "08f72e2a309beab929d9fd14626071b1a61a47f9"
commitID := argocdExampleAppCommitID

tests := []struct {
name string
Expand Down Expand Up @@ -231,3 +237,132 @@ func TestGetFiles(t *testing.T) {
})
}
}

func TestConcurrentGitRequestsDirectories(t *testing.T) {
repoMock := ArgocdRepositoryMock{mock: &mock.Mock{}}
argocdExampleURL := "https://github.com/argoproj/argocd-example-apps/"
commitID := argocdExampleAppCommitID
// https://github.com/argoproj/argocd-example-apps/tree/67de934fd7f22062a4e2ac8b8d20cfc97f2b4e7f/guestbook
lessDirectoriesCommit := "67de934fd7f22062a4e2ac8b8d20cfc97f2b4e7f"

repoMock.mock.On("GetRepository", mock.Anything, argocdExampleURL).Return(&v1alpha1.Repository{
Insecure: true,
InsecureIgnoreHostKey: true,
Repo: "https://github.com/argoproj/argocd-example-apps/",
}, nil)

type fields struct {
repositoriesDB RepositoryDB
storecreds git.CredsStore
submoduleEnabled bool
keyLock *utils.KeyLock
workers int
}
type args struct {
ctx context.Context
repoURL string
revision []string
}
tests := []struct {
name string
fields fields
args args
want map[string][]string
wantErr assert.ErrorAssertionFunc
}{
{name: "Single", fields: fields{
repositoriesDB: repoMock,
keyLock: utils.NewKeyLock(),
workers: 1,
}, args: args{
ctx: context.Background(),
repoURL: argocdExampleURL,
revision: []string{commitID},
}, want: map[string][]string{commitID: {"apps", "apps/templates", "blue-green", "blue-green/templates", "guestbook", "helm-dependency",
"helm-guestbook", "helm-guestbook/templates", "helm-hooks", "jsonnet-guestbook", "jsonnet-guestbook-tla",
"ksonnet-guestbook", "ksonnet-guestbook/components", "ksonnet-guestbook/environments", "ksonnet-guestbook/environments/default",
"ksonnet-guestbook/environments/dev", "ksonnet-guestbook/environments/prod", "kustomize-guestbook", "plugins", "plugins/kasane",
"plugins/kustomized-helm", "plugins/kustomized-helm/overlays", "pre-post-sync", "sock-shop", "sock-shop/base", "sync-waves"}}, wantErr: assert.NoError},
{name: "Many", fields: fields{
repositoriesDB: repoMock,
keyLock: utils.NewKeyLock(),
workers: 5,
}, args: args{
ctx: context.Background(),
repoURL: argocdExampleURL,
revision: []string{lessDirectoriesCommit, commitID, commitID, lessDirectoriesCommit, commitID},
}, want: map[string][]string{
commitID: {"apps", "apps/templates", "blue-green", "blue-green/templates", "guestbook", "helm-dependency",
"helm-guestbook", "helm-guestbook/templates", "helm-hooks", "jsonnet-guestbook", "jsonnet-guestbook-tla",
"ksonnet-guestbook", "ksonnet-guestbook/components", "ksonnet-guestbook/environments", "ksonnet-guestbook/environments/default",
"ksonnet-guestbook/environments/dev", "ksonnet-guestbook/environments/prod", "kustomize-guestbook", "plugins", "plugins/kasane",
"plugins/kustomized-helm", "plugins/kustomized-helm/overlays", "pre-post-sync", "sock-shop", "sock-shop/base", "sync-waves"},
lessDirectoriesCommit: {"guestbook", "guestbook/components", "guestbook/environments", "guestbook/environments/default"}},
wantErr: assert.NoError,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
a := &argoCDService{
repositoriesDB: tt.fields.repositoriesDB,
storecreds: tt.fields.storecreds,
submoduleEnabled: tt.fields.submoduleEnabled,
keyLock: tt.fields.keyLock,
}
var wg sync.WaitGroup
for i := 0; i < len(tt.args.revision); i++ {
wg.Add(1)
revision := tt.args.revision[i]
go func() {
got, err := a.GetDirectories(tt.args.ctx, tt.args.repoURL, revision)
if !tt.wantErr(t, err, fmt.Sprintf("GetDirectories(%v, %v, %v)", tt.args.ctx, tt.args.repoURL, revision)) {
return
}
assert.Equalf(t, tt.want[revision], got, "GetDirectories(%v, %v, %v)", tt.args.ctx, tt.args.repoURL, revision)
wg.Done()
}()
}
wg.Wait()
})
}
}

func TestConcurrentGitRequestsFiles(t *testing.T) {
type fields struct {
repositoriesDB RepositoryDB
storecreds git.CredsStore
submoduleEnabled bool
keyLock *utils.KeyLock
}
type args struct {
ctx context.Context
repoURL string
revision string
pattern string
}
tests := []struct {
name string
fields fields
args args
want map[string][]byte
wantErr assert.ErrorAssertionFunc
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
a := &argoCDService{
repositoriesDB: tt.fields.repositoriesDB,
storecreds: tt.fields.storecreds,
submoduleEnabled: tt.fields.submoduleEnabled,
keyLock: tt.fields.keyLock,
}
got, err := a.GetFiles(tt.args.ctx, tt.args.repoURL, tt.args.revision, tt.args.pattern)
if !tt.wantErr(t, err, fmt.Sprintf("GetFiles(%v, %v, %v, %v)", tt.args.ctx, tt.args.repoURL, tt.args.revision, tt.args.pattern)) {
return
}
assert.Equalf(t, tt.want, got, "GetFiles(%v, %v, %v, %v)", tt.args.ctx, tt.args.repoURL, tt.args.revision, tt.args.pattern)
})
}
}
27 changes: 27 additions & 0 deletions applicationset/utils/keylock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package utils

import "sync"

// KeyLock allows for the locking of any key. Calls are blocking when trying to acquire an existing lock.
// defer keyLock.Lock("foo")()
type KeyLock struct {
lockMap *sync.Map
}

// Lock locks a key and returns the unlock function
func (k *KeyLock) Lock(key string) func() {
value, _ := k.lockMap.LoadOrStore(key, &sync.Mutex{})
m := value.(*sync.Mutex)
m.Lock()
return m.Unlock
}

// Delete removes a key from the store
func (k *KeyLock) Delete(key string) {
k.lockMap.Delete(key)
}

func NewKeyLock() *KeyLock {
var syncMap sync.Map
return &KeyLock{lockMap: &syncMap}
}
29 changes: 29 additions & 0 deletions applicationset/utils/keylock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package utils

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestKeyLock_Delete(t *testing.T) {
k := NewKeyLock()
k.Lock("foo")
k.Delete("foo")
// non-blocking because the key is deleted
k.Lock("foo")()
}

func TestKeyLock_Lock(t *testing.T) {
k := NewKeyLock()
count := 0
k.Lock("foo")
go func() {
// blocking
k.Lock("foo")()
count++
}()

k.Lock("bar")
assert.Equal(t, count, 0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package command

import (
"fmt"
"math"
"net/http"
"os"
"time"
Expand Down Expand Up @@ -57,6 +58,7 @@ func NewCommand() *cobra.Command {
debugLog bool
dryRun bool
enableProgressiveRollouts bool
maxConcurrentReconciles int
)
scheme := runtime.NewScheme()
_ = clientgoscheme.AddToScheme(scheme)
Expand Down Expand Up @@ -130,7 +132,7 @@ func NewCommand() *cobra.Command {
terminalGenerators := map[string]generators.Generator{
"List": generators.NewListGenerator(),
"Clusters": generators.NewClusterGenerator(mgr.GetClient(), ctx, k8sClient, namespace),
"Git": generators.NewGitGenerator(services.NewArgoCDService(argoCDDB, askPassServer, getSubmoduleEnabled())),
"Git": generators.NewGitGenerator(services.NewArgoCDService(argoCDDB, askPassServer, getSubmoduleEnabled(), utils.NewKeyLock())),
"SCMProvider": generators.NewSCMProviderGenerator(mgr.GetClient(), scmAuth),
"ClusterDecisionResource": generators.NewDuckTypeGenerator(ctx, dynamicClient, k8sClient, namespace),
"PullRequest": generators.NewPullRequestGenerator(mgr.GetClient(), scmAuth),
Expand Down Expand Up @@ -179,6 +181,7 @@ func NewCommand() *cobra.Command {
KubeClientset: k8sClient,
ArgoDB: argoCDDB,
EnableProgressiveRollouts: enableProgressiveRollouts,
MaxConcurrentReconciles: maxConcurrentReconciles,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", "ApplicationSet")
os.Exit(1)
Expand Down Expand Up @@ -208,6 +211,7 @@ func NewCommand() *cobra.Command {
command.Flags().StringVar(&cmdutil.LogLevel, "loglevel", env.StringFromEnv("ARGOCD_APPLICATIONSET_CONTROLLER_LOGLEVEL", "info"), "Set the logging level. One of: debug|info|warn|error")
command.Flags().BoolVar(&dryRun, "dry-run", env.ParseBoolFromEnv("ARGOCD_APPLICATIONSET_CONTROLLER_DRY_RUN", false), "Enable dry run mode")
command.Flags().BoolVar(&enableProgressiveRollouts, "enable-progressive-rollouts", env.ParseBoolFromEnv("ARGOCD_APPLICATIONSET_CONTROLLER_ENABLE_PROGRESSIVE_ROLLOUTS", false), "Enable use of the experimental progressive rollouts feature.")
command.Flags().IntVar(&maxConcurrentReconciles, "max-concurrent-reconciles", env.ParseNumFromEnv("ARGOCD_APPLICATIONSET_MAX_CONCURRENT_RECONCILES", 1, 1, math.MaxInt), "Set the number of concurrent ApplicationSets processed by the controller (default: 1).")
return &command
}

Expand Down