Skip to content

Commit

Permalink
fix: allow disabling the request queue (#1985)
Browse files Browse the repository at this point in the history
This introduces a new helm option "cd.backendConfig.disableQueue`. When
true, the cd-service processes incoming requests in parallel.

Ref: SRX-YNEAHU
  • Loading branch information
sven-urbanski-freiheit-com authored Sep 27, 2024
1 parent 17a9b82 commit ebb34b9
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 6 deletions.
2 changes: 2 additions & 0 deletions charts/kuberpult/templates/cd-service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ spec:
{{- end }}
- name: KUBERPULT_GIT_NETWORK_TIMEOUT
value: "{{ .Values.git.networkTimeout }}"
- name: KUBERPULT_DISABLE_QUEUE
value: "{{ .Values.cd.backendConfig.disableQueue }}"
- name: KUBERPULT_GIT_WRITE_COMMIT_DATA
value: "{{ .Values.git.enableWritingCommitData }}"
- name: KUBERPULT_GIT_MAXIMUM_COMMITS_PER_PUSH
Expand Down
4 changes: 4 additions & 0 deletions charts/kuberpult/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ cd:
create: false # Add backend config for health checks on GKE only
timeoutSec: 300 # 30sec is the default on gcp loadbalancers, however kuberpult needs more with parallel requests. It is the time how long the loadbalancer waits for kuberpult to finish calls to the rest endpoint "release"
queueSize: 5
# Disabling the queue is as of now an experimental feature. It's only possible to use with db.writeEslTableOnly=false.
# With the queue, the cd-service processes only one request at a time, which is very much required when using git.
# With the database enabled, this is not required anymore.
disableQueue: false
resources:
limits:
cpu: 2
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ services:
image: europe-west3-docker.pkg.dev/fdc-public-docker-registry/kuberpult/kuberpult-cd-service:local
environment:
- LOG_LEVEL=INFO
- KUBERPULT_DISABLE_QUEUE=true
- KUBERPULT_GIT_URL=/kp/kuberpult/repository_remote
- KUBERPULT_DB_LOCATION=postgres
- KUBERPULT_DB_NAME=kuberpult
Expand Down
4 changes: 4 additions & 0 deletions services/cd-service/pkg/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ type Config struct {
DbSslMode string `default:"verify-full" split_words:"true"`
MinorRegexes string `default:"" split_words:"true"`
AllowedDomains []string `split_words:"true"`

DisableQueue bool `required:"true" split_words:"true"`
}

func (c *Config) storageBackend() repository.StorageBackend {
Expand Down Expand Up @@ -326,6 +328,8 @@ func RunServer() {
ArgoCdGenerateFiles: c.ArgoCdGenerateFiles,
DBHandler: dbHandler,
CloudRunClient: cloudRunClient,

DisableQueue: c.DisableQueue,
}

repo, repoQueue, err := repository.New2(ctx, cfg)
Expand Down
42 changes: 36 additions & 6 deletions services/cd-service/pkg/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ type RepositoryConfig struct {

DBHandler *db.DBHandler
CloudRunClient *cloudrun.CloudRunClient

DisableQueue bool
}

func openOrCreate(path string, storageBackend StorageBackend) (*git.Repository, error) {
Expand Down Expand Up @@ -563,6 +565,7 @@ func (r *repository) applyTransformerBatches(transformerBatches []transformerBat
})

if txErr != nil {
logger.FromContext(e.ctx).Sugar().Warnf("txError in applyTransformerBatches: %w", txErr)
e.finish(txErr)
transformerBatches = append(transformerBatches[:i], transformerBatches[i+1:]...)
continue //Skip this batch
Expand Down Expand Up @@ -1089,12 +1092,39 @@ func (r *repository) FetchAndReset(ctx context.Context) error {
}

func (r *repository) Apply(ctx context.Context, transformers ...Transformer) error {
eCh := r.applyDeferred(ctx, transformers...)
select {
case err := <-eCh:
return err
case <-ctx.Done():
return ctx.Err()
if r.config.DisableQueue && r.DB.ShouldUseOtherTables() {
changes, err := db.WithTransactionT(r.DB, ctx, 2, false, func(ctx context.Context, transaction *sql.Tx) (*TransformerResult, error) {
subChanges, applyErr := r.ApplyTransformers(ctx, transaction, transformers...)
if applyErr != nil {
return nil, applyErr
}
return subChanges, nil
})

if err != nil {
return err
}
if r.config.DogstatsdEvents {
var ddError error
if r.DB.ShouldUseEslTable() {
ddError = UpdateDatadogMetricsDB(ctx, r.State(), r, changes, time.Now())
} else {
ddError = UpdateDatadogMetrics(ctx, nil, r.State(), r, changes, time.Now())
}
if ddError != nil {
logger.FromContext(ctx).Warn(fmt.Sprintf("Could not send datadog metrics/events %v", ddError))
}
}
r.notify.Notify()
return nil
} else {
eCh := r.applyDeferred(ctx, transformers...)
select {
case err := <-eCh:
return err
case <-ctx.Done():
return ctx.Err()
}
}
}

Expand Down

0 comments on commit ebb34b9

Please sign in to comment.