Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(rollout-service): Adapt rollout service to get information from the GetAppDetails endpoint #2041

Merged
merged 28 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
b7d991a
channels
miguel-crespo-fdc Oct 10, 2024
66a3b82
PRogress on new endpoint
miguel-crespo-fdc Oct 11, 2024
f0f465d
Adapt rollout service for new app details
miguel-crespo-fdc Oct 14, 2024
aedf591
lint
miguel-crespo-fdc Oct 14, 2024
b57c420
linter
miguel-crespo-fdc Oct 14, 2024
2936020
Merge branch 'main' into mc/new-endpoint-rollout
miguel-crespo-fdc Oct 14, 2024
8bed2c5
remove repo metadata
miguel-crespo-fdc Oct 14, 2024
337e7e8
Merge branch 'mc/new-endpoint-rollout' of github.com:freiheit-com/kub…
miguel-crespo-fdc Oct 14, 2024
c8bbcac
team
miguel-crespo-fdc Oct 14, 2024
9ea51a6
final touches
miguel-crespo-fdc Oct 14, 2024
a42bc74
self PR review
miguel-crespo-fdc Oct 14, 2024
5f2773e
self review PR
miguel-crespo-fdc Oct 14, 2024
d68f2c9
reduce noise
miguel-crespo-fdc Oct 14, 2024
f8ccc9e
changed apps
miguel-crespo-fdc Oct 14, 2024
e40c48e
reducing noise
miguel-crespo-fdc Oct 14, 2024
cfad517
noise
miguel-crespo-fdc Oct 14, 2024
4a65847
Merge branch 'main' into mc/new-endpoint-rollout
miguel-crespo-fdc Oct 14, 2024
4a0fdbe
Merge branch 'main' into mc/new-endpoint-rollout
miguel-crespo-fdc Oct 15, 2024
808f58b
PR Concerns
miguel-crespo-fdc Oct 16, 2024
f68eca1
Merge branch 'mc/new-endpoint-rollout' of github.com:freiheit-com/kub…
miguel-crespo-fdc Oct 16, 2024
167a0fa
Flakeyness
miguel-crespo-fdc Oct 16, 2024
7309fcf
Merge branch 'main' into mc/new-endpoint-rollout
miguel-crespo-fdc Oct 16, 2024
d7a3698
revert
miguel-crespo-fdc Oct 16, 2024
14b80f7
Merge branch 'mc/new-endpoint-rollout' of github.com:freiheit-com/kub…
miguel-crespo-fdc Oct 16, 2024
237571f
more flakeyness
miguel-crespo-fdc Oct 16, 2024
d187236
Merge branch 'main' into mc/new-endpoint-rollout
miguel-crespo-fdc Oct 16, 2024
ddfd0a9
PR concerns from Hannes
miguel-crespo-fdc Oct 16, 2024
3c451fc
Merge branch 'main' into mc/new-endpoint-rollout
miguel-crespo-fdc Oct 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pkg/api/v1/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -378,13 +378,20 @@ service OverviewService {
rpc GetOverview (GetOverviewRequest) returns (GetOverviewResponse) {}
rpc StreamOverview (GetOverviewRequest) returns (stream GetOverviewResponse) {}

rpc StreamChangedApps (GetChangedAppsRequest) returns (stream GetChangedAppsResponse) {}
rpc GetAppDetails (GetAppDetailsRequest) returns (GetAppDetailsResponse) {}
}

service EnvironmentService {
rpc GetEnvironmentConfig(GetEnvironmentConfigRequest) returns (GetEnvironmentConfigResponse) {}
}

message GetChangedAppsRequest {}

message GetChangedAppsResponse {
repeated string changed_apps = 1;
}

message GetAppDetailsRequest {
string app_name = 1;
}
Expand All @@ -396,6 +403,7 @@ message GetAppDetailsResponse {
map<string, Locks> team_locks= 4; //EnvName -> []TeamLocks
}


//Wrapper over array of locks
message Locks {
repeated Lock locks = 2;
Expand All @@ -421,7 +429,6 @@ message Deployment {
DeploymentMetaData deployment_meta_data = 7;
}


message GetOverviewRequest {
// Retrieve the overview at a certain state of the repository. If it's empty, the latest commit will be used.
string git_revision = 1;
Expand Down
37 changes: 34 additions & 3 deletions services/cd-service/pkg/notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ Copyright freiheit.com*/

package notify

import "sync"
import (
"sync"
)

type Notify struct {
mx sync.Mutex
listener map[chan struct{}]struct{}
mx sync.Mutex
listener map[chan struct{}]struct{}
miguel-crespo-fdc marked this conversation as resolved.
Show resolved Hide resolved
changeAppsListener map[chan []string][]string
}

type Unsubscribe = func()
Expand Down Expand Up @@ -53,3 +56,31 @@ func (n *Notify) Notify() {
}
}
}

func (n *Notify) SubscribeChangesApps() (<-chan []string, Unsubscribe) {
miguel-crespo-fdc marked this conversation as resolved.
Show resolved Hide resolved
miguel-crespo-fdc marked this conversation as resolved.
Show resolved Hide resolved
ch := make(chan []string, 1)
ch <- []string{}

n.mx.Lock()
defer n.mx.Unlock()
if n.changeAppsListener == nil {
n.changeAppsListener = map[chan []string][]string{}
}
n.changeAppsListener[ch] = []string{}
return ch, func() {
n.mx.Lock()
defer n.mx.Unlock()
delete(n.changeAppsListener, ch)
}
}

func (n *Notify) NotifyChangedApps(changedApps []string) {
n.mx.Lock()
defer n.mx.Unlock()
for ch := range n.changeAppsListener {
select {
case ch <- changedApps:
miguel-crespo-fdc marked this conversation as resolved.
Show resolved Hide resolved
default:
}
}
}
27 changes: 24 additions & 3 deletions services/cd-service/pkg/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ func (r *repository) applyTransformerBatches(transformerBatches []transformerBat
}
}
}

miguel-crespo-fdc marked this conversation as resolved.
Show resolved Hide resolved
return transformerBatches, nil, changes
}

Expand Down Expand Up @@ -740,7 +740,6 @@ func (r *repository) ProcessQueueOnce(ctx context.Context, e transformerBatch, c
}

transformerBatches, err, changes := r.applyTransformerBatches(transformerBatches, true)

if len(transformerBatches) == 0 {
return
}
Expand Down Expand Up @@ -794,6 +793,17 @@ func (r *repository) ProcessQueueOnce(ctx context.Context, e transformerBatch, c
ddSpan.Finish()

r.notify.Notify()
var changedAppNames []string
var seen = make(map[string]bool)
for _, app := range changes.ChangedApps {
if _, ok := seen[app.App]; !ok {
seen[app.App] = true
changedAppNames = append(changedAppNames, app.App)
}
}
if len(changedAppNames) != 0 {
r.notify.NotifyChangedApps(changedAppNames)
}
}

func UpdateDatadogMetricsDB(ctx context.Context, state *State, r Repository, changes *TransformerResult, now time.Time) error {
Expand Down Expand Up @@ -1122,6 +1132,18 @@ func (r *repository) Apply(ctx context.Context, transformers ...Transformer) err
}
}
r.notify.Notify()

var changedAppNames []string
var seen = make(map[string]bool)
for _, app := range changes.ChangedApps {
if _, ok := seen[app.App]; !ok {
seen[app.App] = true
changedAppNames = append(changedAppNames, app.App)
}
}
if len(changedAppNames) != 0 {
r.notify.NotifyChangedApps(changedAppNames)
}
miguel-crespo-fdc marked this conversation as resolved.
Show resolved Hide resolved
return nil
} else {
eCh := r.applyDeferred(ctx, transformers...)
Expand Down Expand Up @@ -2210,7 +2232,6 @@ func (s *State) WriteCurrentEnvironmentLocks(ctx context.Context, transaction *s
return err
}
for lockId, lock := range ls {
fmt.Printf("LOCK: %s\n", lock.CreatedAt.String())
currentEnv := db.EnvironmentLock{
EslVersion: 0,
Env: envName,
Expand Down
23 changes: 17 additions & 6 deletions services/cd-service/pkg/repository/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2792,6 +2792,17 @@ func (c *CreateEnvironment) Transform(
if err != nil {
return "", fmt.Errorf("Unable to write overview cache, error: %w", err)
}

//Should be empty on new environments
envApps, err := state.GetEnvironmentApplications(ctx, transaction, c.Environment)
if err != nil {
return "", fmt.Errorf("Unable to read environment, error: %w", err)

}
for _, app := range envApps {
t.AddAppEnv(app, c.Environment, "")
}

miguel-crespo-fdc marked this conversation as resolved.
Show resolved Hide resolved
} else {
fs := state.Filesystem
envDir := fs.Join("environments", c.Environment)
Expand Down Expand Up @@ -3069,6 +3080,7 @@ func (c *DeployApplicationVersion) Transform(
if err != nil {
return "", fmt.Errorf("could not write oldest deployment for %v - %v", newDeployment, err)
}

miguel-crespo-fdc marked this conversation as resolved.
Show resolved Hide resolved
} else {
//Check if there is a version of target app already deployed on target environment
if _, err := fs.Lstat(versionFile); err == nil {
Expand Down Expand Up @@ -3107,11 +3119,6 @@ func (c *DeployApplicationVersion) Transform(
if err := util.WriteFile(fs, manifestFilename, manifestContent, 0666); err != nil {
return "", err
}
teamOwner, err := state.GetApplicationTeamOwner(ctx, transaction, c.Application)
if err != nil {
return "", err
}
t.AddAppEnv(c.Application, c.Environment, teamOwner)

if err := util.WriteFile(fs, fs.Join(applicationDir, "deployed_by"), []byte(user.Name), 0666); err != nil {
return "", err
Expand All @@ -3124,7 +3131,11 @@ func (c *DeployApplicationVersion) Transform(
return "", err
}
}

teamOwner, err := state.GetApplicationTeamOwner(ctx, transaction, c.Application)
if err != nil {
return "", err
}
t.AddAppEnv(c.Application, c.Environment, teamOwner)
s := State{
Commit: nil,
MinorRegexes: state.MinorRegexes,
Expand Down
57 changes: 53 additions & 4 deletions services/cd-service/pkg/service/overview.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ type OverviewServiceServer struct {
RepositoryConfig repository.RepositoryConfig
Shutdown <-chan struct{}

notify notify.Notify
Context context.Context
init sync.Once
response atomic.Value
notify notify.Notify
Context context.Context
init sync.Once
miguel-crespo-fdc marked this conversation as resolved.
Show resolved Hide resolved
changedAppsStreamingInitFunc sync.Once
response atomic.Value

DBHandler *db.DBHandler
}
Expand Down Expand Up @@ -349,6 +350,31 @@ func (o *OverviewServiceServer) StreamOverview(in *api.GetOverviewRequest,
}
}

func (o *OverviewServiceServer) StreamChangedApps(in *api.GetChangedAppsRequest,
stream api.OverviewService_StreamChangedAppsServer) error {
ch, unsubscribe := o.subscribeChangedApps()
defer unsubscribe()
done := stream.Context().Done()
for {
select {
case <-o.Shutdown:
return nil
case changedApps := <-ch:
ov := &api.GetChangedAppsResponse{
ChangedApps: changedApps,
}
logger.FromContext(stream.Context()).Sugar().Infof("Got changes apps: '%v'\n", changedApps)
if err := stream.Send(ov); err != nil {
logger.FromContext(stream.Context()).Error("error sending changed apps response:", zap.Error(err), zap.String("changedApps", fmt.Sprintf("%+v", ov)))
return err
}

case <-done:
return nil
}
}
}

func (o *OverviewServiceServer) subscribe() (<-chan struct{}, notify.Unsubscribe) {
o.init.Do(func() {
ch, unsub := o.Repository.Notify().Subscribe()
Expand All @@ -372,6 +398,29 @@ func (o *OverviewServiceServer) subscribe() (<-chan struct{}, notify.Unsubscribe
return o.notify.Subscribe()
}

func (o *OverviewServiceServer) subscribeChangedApps() (<-chan []string, notify.Unsubscribe) {
o.changedAppsStreamingInitFunc.Do(func() {
ch, unsub := o.Repository.Notify().SubscribeChangesApps()
// Channels obtained from subscribe are by default triggered
//
// This means, we have to wait here until the first overview is loaded.
miguel-crespo-fdc marked this conversation as resolved.
Show resolved Hide resolved
<-ch
o.notify.NotifyChangedApps([]string{})
go func() {
defer unsub()
for {
select {
case <-o.Shutdown:
return
case changedApps := <-ch:
o.notify.NotifyChangedApps(changedApps)
}
}
}()
})
return o.notify.SubscribeChangesApps()
}

func (o *OverviewServiceServer) update(s *repository.State) {
r, err := o.getOverviewDB(o.Context, s)
if err != nil {
Expand Down
18 changes: 18 additions & 0 deletions services/frontend-service/pkg/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,24 @@ func (p *GrpcProxy) StreamOverview(
}
}

func (p *GrpcProxy) StreamChangedApps(
in *api.GetChangedAppsRequest,
stream api.OverviewService_StreamChangedAppsServer) error {
if resp, err := p.OverviewClient.StreamChangedApps(stream.Context(), in); err != nil {
return err
} else {
for {
if item, err := resp.Recv(); err != nil {
return err
} else {
if err := stream.Send(item); err != nil {
return err
}
}
}
}
}

func (p *GrpcProxy) StreamStatus(in *api.StreamStatusRequest, stream api.RolloutService_StreamStatusServer) error {
if p.RolloutServiceClient == nil {
return status.Error(codes.Unimplemented, "rollout service not configured")
Expand Down
Loading
Loading