Skip to content

Commit

Permalink
chore(rollout-service): Adapt rollout service to get information from…
Browse files Browse the repository at this point in the history
… the GetAppDetails endpoint (#2041)

Important notes:
* The rollout service no longer listens for to the stream overview
endpoint. It now now listens to the GetChangedApps endpoint, that gets
triggered when there is some change to any app, or when an environment
is edited
* The rollout service still makes a request to the overview service to
obtain the overview. This is done because there is important environment
data there.
* The rollout service functionality should remain UNCHANGED.
Ref: SRX-FIC65Q
  • Loading branch information
miguel-crespo-fdc authored Oct 17, 2024
1 parent 6d4cbb0 commit c9db634
Show file tree
Hide file tree
Showing 11 changed files with 1,069 additions and 488 deletions.
9 changes: 8 additions & 1 deletion pkg/api/v1/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -378,13 +378,20 @@ service OverviewService {
rpc GetOverview (GetOverviewRequest) returns (GetOverviewResponse) {}
rpc StreamOverview (GetOverviewRequest) returns (stream GetOverviewResponse) {}

rpc StreamChangedApps (GetChangedAppsRequest) returns (stream GetChangedAppsResponse) {}
rpc GetAppDetails (GetAppDetailsRequest) returns (GetAppDetailsResponse) {}
}

service EnvironmentService {
rpc GetEnvironmentConfig(GetEnvironmentConfigRequest) returns (GetEnvironmentConfigResponse) {}
}

message GetChangedAppsRequest {}

message GetChangedAppsResponse {
repeated GetAppDetailsResponse changed_apps = 1;
}

message GetAppDetailsRequest {
string app_name = 1;
}
Expand All @@ -396,6 +403,7 @@ message GetAppDetailsResponse {
map<string, Locks> team_locks= 4; //EnvName -> []TeamLocks
}


//Wrapper over array of locks
message Locks {
repeated Lock locks = 2;
Expand All @@ -421,7 +429,6 @@ message Deployment {
DeploymentMetaData deployment_meta_data = 7;
}


message GetOverviewRequest {
// Retrieve the overview at a certain state of the repository. If it's empty, the latest commit will be used.
string git_revision = 1;
Expand Down
49 changes: 41 additions & 8 deletions services/cd-service/pkg/notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ Copyright freiheit.com*/

package notify

import "sync"
import (
"sync"
)

type Notify struct {
mx sync.Mutex
listener map[chan struct{}]struct{}
mx sync.Mutex
oveviewListener map[chan struct{}]struct{}
changeAppsListener map[chan ChangedAppNames]struct{}
}

type Unsubscribe = func()
Expand All @@ -31,25 +34,55 @@ func (n *Notify) Subscribe() (<-chan struct{}, Unsubscribe) {

n.mx.Lock()
defer n.mx.Unlock()
if n.listener == nil {
n.listener = map[chan struct{}]struct{}{}
if n.oveviewListener == nil {
n.oveviewListener = map[chan struct{}]struct{}{}
}

n.listener[ch] = struct{}{}
n.oveviewListener[ch] = struct{}{}
return ch, func() {
n.mx.Lock()
defer n.mx.Unlock()
delete(n.listener, ch)
delete(n.oveviewListener, ch)
}
}

func (n *Notify) Notify() {
n.mx.Lock()
defer n.mx.Unlock()
for ch := range n.listener {
for ch := range n.oveviewListener {
select {
case ch <- struct{}{}:
default:
}
}
}

type ChangedAppNames []string

func (n *Notify) SubscribeChangesApps() (<-chan ChangedAppNames, Unsubscribe) {
ch := make(chan ChangedAppNames, 1)
ch <- ChangedAppNames{}

n.mx.Lock()
defer n.mx.Unlock()
if n.changeAppsListener == nil {
n.changeAppsListener = map[chan ChangedAppNames]struct{}{}
}
n.changeAppsListener[ch] = struct{}{}
return ch, func() {
n.mx.Lock()
defer n.mx.Unlock()
delete(n.changeAppsListener, ch)
}
}

func (n *Notify) NotifyChangedApps(changedApps ChangedAppNames) {
n.mx.Lock()
defer n.mx.Unlock()
for ch := range n.changeAppsListener {
select {
case ch <- changedApps:
default:
}
}
}
19 changes: 16 additions & 3 deletions services/cd-service/pkg/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,6 @@ func (r *repository) applyTransformerBatches(transformerBatches []transformerBat
}
}
}

return transformerBatches, nil, changes
}

Expand Down Expand Up @@ -740,7 +739,6 @@ func (r *repository) ProcessQueueOnce(ctx context.Context, e transformerBatch, c
}

transformerBatches, err, changes := r.applyTransformerBatches(transformerBatches, true)

if len(transformerBatches) == 0 {
return
}
Expand Down Expand Up @@ -794,6 +792,7 @@ func (r *repository) ProcessQueueOnce(ctx context.Context, e transformerBatch, c
ddSpan.Finish()

r.notify.Notify()
r.notifyChangedApps(changes)
}

func UpdateDatadogMetricsDB(ctx context.Context, state *State, r Repository, changes *TransformerResult, now time.Time) error {
Expand Down Expand Up @@ -1122,6 +1121,7 @@ func (r *repository) Apply(ctx context.Context, transformers ...Transformer) err
}
}
r.notify.Notify()
r.notifyChangedApps(changes)
return nil
} else {
eCh := r.applyDeferred(ctx, transformers...)
Expand All @@ -1134,6 +1134,20 @@ func (r *repository) Apply(ctx context.Context, transformers ...Transformer) err
}
}

func (r *repository) notifyChangedApps(changes *TransformerResult) {
var changedAppNames []string
var seen = make(map[string]bool)
for _, app := range changes.ChangedApps {
if _, ok := seen[app.App]; !ok {
seen[app.App] = true
changedAppNames = append(changedAppNames, app.App)
}
}
if len(changedAppNames) != 0 {
r.notify.NotifyChangedApps(changedAppNames)
}
}

func (r *repository) applyDeferred(ctx context.Context, transformers ...Transformer) <-chan error {
return r.queue.add(ctx, transformers)
}
Expand Down Expand Up @@ -2210,7 +2224,6 @@ func (s *State) WriteCurrentEnvironmentLocks(ctx context.Context, transaction *s
return err
}
for lockId, lock := range ls {
fmt.Printf("LOCK: %s\n", lock.CreatedAt.String())
currentEnv := db.EnvironmentLock{
EslVersion: 0,
Env: envName,
Expand Down
22 changes: 16 additions & 6 deletions services/cd-service/pkg/repository/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2792,6 +2792,17 @@ func (c *CreateEnvironment) Transform(
if err != nil {
return "", fmt.Errorf("Unable to write overview cache, error: %w", err)
}

//Should be empty on new environments
envApps, err := state.GetEnvironmentApplications(ctx, transaction, c.Environment)
if err != nil {
return "", fmt.Errorf("Unable to read environment, error: %w", err)

}
for _, app := range envApps {
t.AddAppEnv(app, c.Environment, "")
}

} else {
fs := state.Filesystem
envDir := fs.Join("environments", c.Environment)
Expand Down Expand Up @@ -3107,11 +3118,6 @@ func (c *DeployApplicationVersion) Transform(
if err := util.WriteFile(fs, manifestFilename, manifestContent, 0666); err != nil {
return "", err
}
teamOwner, err := state.GetApplicationTeamOwner(ctx, transaction, c.Application)
if err != nil {
return "", err
}
t.AddAppEnv(c.Application, c.Environment, teamOwner)

if err := util.WriteFile(fs, fs.Join(applicationDir, "deployed_by"), []byte(user.Name), 0666); err != nil {
return "", err
Expand All @@ -3124,7 +3130,11 @@ func (c *DeployApplicationVersion) Transform(
return "", err
}
}

teamOwner, err := state.GetApplicationTeamOwner(ctx, transaction, c.Application)
if err != nil {
return "", err
}
t.AddAppEnv(c.Application, c.Environment, teamOwner)
s := State{
Commit: nil,
MinorRegexes: state.MinorRegexes,
Expand Down
81 changes: 76 additions & 5 deletions services/cd-service/pkg/service/overview.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ type OverviewServiceServer struct {
RepositoryConfig repository.RepositoryConfig
Shutdown <-chan struct{}

notify notify.Notify
Context context.Context
init sync.Once
response atomic.Value
notify notify.Notify
Context context.Context
overviewStreamingInitFunc sync.Once
changedAppsStreamingInitFunc sync.Once
response atomic.Value

DBHandler *db.DBHandler
}
Expand Down Expand Up @@ -334,6 +335,7 @@ func (o *OverviewServiceServer) StreamOverview(in *api.GetOverviewRequest,
return nil
case <-ch:
ov := o.response.Load().(*api.GetOverviewResponse)

if err := stream.Send(ov); err != nil {
// if we don't log this here, the details will be lost - so this is an exception to the rule "either return an error or log it".
// for example if there's an invalid encoding, grpc will just give a generic error like
Expand All @@ -349,8 +351,48 @@ func (o *OverviewServiceServer) StreamOverview(in *api.GetOverviewRequest,
}
}

func (o *OverviewServiceServer) StreamChangedApps(in *api.GetChangedAppsRequest,
stream api.OverviewService_StreamChangedAppsServer) error {
ch, unsubscribe := o.subscribeChangedApps()
defer unsubscribe()
done := stream.Context().Done()
for {
select {
case <-o.Shutdown:
return nil
case changedAppsNames := <-ch:
if len(changedAppsNames) == 0 { //This only happens when a channel is first triggered, so we send all apps
allApps, err := o.getAllAppNames(stream.Context())
if err != nil {
return err
}
changedAppsNames = allApps
}
ov := &api.GetChangedAppsResponse{
ChangedApps: make([]*api.GetAppDetailsResponse, len(changedAppsNames)),
}
for idx, appName := range changedAppsNames {
response, err := o.GetAppDetails(stream.Context(), &api.GetAppDetailsRequest{AppName: appName})
if err != nil {
return err
}
ov.ChangedApps[idx] = response
}

logger.FromContext(stream.Context()).Sugar().Infof("Sending changes apps: '%v'\n", changedAppsNames)
if err := stream.Send(ov); err != nil {
logger.FromContext(stream.Context()).Error("error sending changed apps response:", zap.Error(err), zap.String("changedAppsNames", fmt.Sprintf("%+v", ov)))
return err
}

case <-done:
return nil
}
}
}

func (o *OverviewServiceServer) subscribe() (<-chan struct{}, notify.Unsubscribe) {
o.init.Do(func() {
o.overviewStreamingInitFunc.Do(func() {
ch, unsub := o.Repository.Notify().Subscribe()
// Channels obtained from subscribe are by default triggered
//
Expand All @@ -364,6 +406,7 @@ func (o *OverviewServiceServer) subscribe() (<-chan struct{}, notify.Unsubscribe
case <-o.Shutdown:
return
case <-ch:

o.update(o.Repository.State())
}
}
Expand All @@ -372,6 +415,34 @@ func (o *OverviewServiceServer) subscribe() (<-chan struct{}, notify.Unsubscribe
return o.notify.Subscribe()
}

func (o *OverviewServiceServer) subscribeChangedApps() (<-chan notify.ChangedAppNames, notify.Unsubscribe) {
o.changedAppsStreamingInitFunc.Do(func() {
ch, unsub := o.Repository.Notify().SubscribeChangesApps()
// Channels obtained from subscribe are by default triggered
//
// This means, we have to wait here until the changedApps are loaded for the first time.
<-ch
go func() {
defer unsub()
for {
select {
case <-o.Shutdown:
return
case changedApps := <-ch:
o.notify.NotifyChangedApps(changedApps)
}
}
}()
})
return o.notify.SubscribeChangesApps()
}

func (o *OverviewServiceServer) getAllAppNames(ctx context.Context) ([]string, error) {
return db.WithTransactionMultipleEntriesT(o.DBHandler, ctx, true, func(ctx context.Context, transaction *sql.Tx) ([]string, error) {
return o.Repository.State().GetApplications(ctx, transaction)
})
}

func (o *OverviewServiceServer) update(s *repository.State) {
r, err := o.getOverviewDB(o.Context, s)
if err != nil {
Expand Down
18 changes: 18 additions & 0 deletions services/frontend-service/pkg/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,24 @@ func (p *GrpcProxy) StreamOverview(
}
}

func (p *GrpcProxy) StreamChangedApps(
in *api.GetChangedAppsRequest,
stream api.OverviewService_StreamChangedAppsServer) error {
if resp, err := p.OverviewClient.StreamChangedApps(stream.Context(), in); err != nil {
return err
} else {
for {
if item, err := resp.Recv(); err != nil {
return err
} else {
if err := stream.Send(item); err != nil {
return err
}
}
}
}
}

func (p *GrpcProxy) StreamStatus(in *api.StreamStatusRequest, stream api.RolloutService_StreamStatusServer) error {
if p.RolloutServiceClient == nil {
return status.Error(codes.Unimplemented, "rollout service not configured")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestHandleCommitDeployments(t *testing.T) {
inputTail: "123456/",
failGrpcCall: false,
expectedStatusCode: http.StatusOK,
expectedResponse: "{\"deploymentStatus\":{\"app1\":{\"deploymentStatus\":{\"dev\":\"DEPLOYED\", \"prod\":\"UNKNOWN\", \"stage\":\"PENDING\"}}}}\n",
expectedResponse: "{\"deploymentStatus\":{\"app1\":{\"deploymentStatus\":{\"dev\":\"DEPLOYED\",\"prod\":\"UNKNOWN\",\"stage\":\"PENDING\"}}}}\n",
},
}
for _, tc := range tcs {
Expand Down
Loading

0 comments on commit c9db634

Please sign in to comment.