From 78c6e5e8827935f6e70833dda73d14a4ed292af4 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Sun, 5 Nov 2023 23:39:41 -0500 Subject: [PATCH] feat(cd): rust-ceramic deployment --- cd/manager/common/aws/ecs/ecs.go | 45 ++++++++++++++++++++--------- cd/manager/jobs/deploy.go | 49 ++++++++++++++++++++------------ cd/manager/models.go | 49 ++++++++++++++++++++------------ cd/manager/notifs/discord.go | 4 +-- cd/manager/repository/github.go | 18 ++++++++---- cd/manager/utils.go | 14 +++++---- 6 files changed, 117 insertions(+), 62 deletions(-) diff --git a/cd/manager/common/aws/ecs/ecs.go b/cd/manager/common/aws/ecs/ecs.go index d57326c..57371cc 100644 --- a/cd/manager/common/aws/ecs/ecs.go +++ b/cd/manager/common/aws/ecs/ecs.go @@ -36,6 +36,7 @@ const ( ) const resourceTag = "Ceramic" +const publicEcrUri = "public.ecr.aws/r5b3e0r5/3box/" func NewEcs(cfg aws.Config) manager.Deployment { ecrUri := os.Getenv("AWS_ACCOUNT_ID") + ".dkr.ecr." + os.Getenv("AWS_REGION") + ".amazonaws.com/" @@ -123,12 +124,23 @@ func (e Ecs) GetLayout(clusters []string) (*manager.Layout, error) { layout.Clusters[cluster] = &manager.Cluster{ServiceTasks: &manager.TaskSet{Tasks: map[string]*manager.Task{}}} for _, serviceArn := range clusterServices.ServiceArns { service := e.serviceNameFromArn(serviceArn) - ecsService, err := e.describeEcsService(cluster, service) - if err != nil { + if ecsService, err := e.describeEcsService(cluster, service); err != nil { log.Printf("getLayout: describe service error: %s, %s, %v", cluster, service, err) return nil, err + } else { + taskDefArn := *ecsService.Services[0].TaskDefinition + containerDefNames := make([]string, 0, 1) + if taskDef, err := e.getEcsTaskDefinition(taskDefArn); err != nil { + log.Printf("getLayout: get task def error: %s, %s, %s, %v", taskDefArn, cluster, service, err) + return nil, err + } else { + for _, containerDef := range taskDef.ContainerDefinitions { + containerDefNames = append(containerDefNames, *containerDef.Name) + } + } + // Return the names of all the containers associated with this task definition + layout.Clusters[cluster].ServiceTasks.Tasks[service] = &manager.Task{Id: taskDefArn, Name: strings.Join(containerDefNames, ",")} } - layout.Clusters[cluster].ServiceTasks.Tasks[service] = &manager.Task{Id: *ecsService.Services[0].TaskDefinition} } } } @@ -137,9 +149,9 @@ func (e Ecs) GetLayout(clusters []string) (*manager.Layout, error) { func (e Ecs) UpdateLayout(layout *manager.Layout, commitHash string) error { for clusterName, cluster := range layout.Clusters { - clusterRepo := layout.Repo - if len(cluster.Repo) > 0 { - clusterRepo = cluster.Repo + clusterRepo := e.getEcrRepo(*layout.Repo) // The main layout repo should never be null + if cluster.Repo != nil { + clusterRepo = e.getEcrRepo(*cluster.Repo) } if err := e.updateEnvCluster(cluster, clusterName, clusterRepo, commitHash); err != nil { return err @@ -242,7 +254,7 @@ func (e Ecs) updateEcsTaskDefinition(taskDefArn, image, containerName string) (s for idx, containerDef := range taskDef.ContainerDefinitions { if *containerDef.Name == containerName { - taskDef.ContainerDefinitions[idx].Image = aws.String(e.ecrUri + image) + taskDef.ContainerDefinitions[idx].Image = aws.String(image) regTaskDefInput := &ecs.RegisterTaskDefinitionInput{ ContainerDefinitions: taskDef.ContainerDefinitions, Family: taskDef.Family, @@ -432,8 +444,8 @@ func (e Ecs) updateEnvTaskSet(taskSet *manager.TaskSet, deployType string, clust if taskSet != nil { for taskSetName, task := range taskSet.Tasks { taskSetRepo := clusterRepo - if len(taskSet.Repo) > 0 { - taskSetRepo = taskSet.Repo + if taskSet.Repo != nil { + taskSetRepo = e.getEcrRepo(*taskSet.Repo) } switch deployType { case deployType_Service: @@ -454,8 +466,8 @@ func (e Ecs) updateEnvTaskSet(taskSet *manager.TaskSet, deployType string, clust func (e Ecs) updateEnvServiceTask(task *manager.Task, cluster, service, taskSetRepo, commitHash string) error { taskRepo := taskSetRepo - if len(task.Repo) > 0 { - taskRepo = task.Repo + if task.Repo != nil { + taskRepo = e.getEcrRepo(*task.Repo) } if id, err := e.updateEcsService(cluster, service, taskRepo+":"+commitHash, task.Name, task.Temp); err != nil { return err @@ -467,8 +479,8 @@ func (e Ecs) updateEnvServiceTask(task *manager.Task, cluster, service, taskSetR func (e Ecs) updateEnvTask(task *manager.Task, cluster, taskName, taskSetRepo, commitHash string) error { taskRepo := taskSetRepo - if len(task.Repo) > 0 { - taskRepo = task.Repo + if task.Repo != nil { + taskRepo = e.getEcrRepo(*task.Repo) } if id, err := e.updateEcsTask(cluster, taskName, taskRepo+":"+commitHash, task.Name, task.Temp); err != nil { return err @@ -547,3 +559,10 @@ func (e Ecs) parseEcsFailures(ecsFailures []types.Failure) []ecsFailure { } return failures } + +func (e Ecs) getEcrRepo(repo manager.Repo) string { + if repo.Public { + return publicEcrUri + repo.Name + } + return e.ecrUri + repo.Name +} diff --git a/cd/manager/jobs/deploy.go b/cd/manager/jobs/deploy.go index 6dfce3d..61874bb 100644 --- a/cd/manager/jobs/deploy.go +++ b/cd/manager/jobs/deploy.go @@ -7,6 +7,8 @@ import ( "strings" "time" + "golang.org/x/exp/slices" + "github.com/3box/pipeline-tools/cd/manager" "github.com/3box/pipeline-tools/cd/manager/common/job" ) @@ -47,6 +49,7 @@ const ( containerName_CasApi string = "cas_api" containerName_CasWorker string = "cas_anchor" containerName_CasV5Scheduler string = "scheduler" + containerName_RustCeramic string = "rust-ceramic" ) const defaultFailureTime = 30 * time.Minute @@ -146,8 +149,11 @@ func (d deployJob) prepareJob(deployHashes map[manager.DeployComponent]string) e // The last two cases will only happen when redeploying manually, so we can note that in the notification. if d.sha == buildHashLatest { shaTag, _ := d.state.Params[job.DeployJobParam_ShaTag].(string) - if latestSha, err := d.repo.GetLatestCommitHash( - manager.ComponentRepo(d.component), + if repo, err := manager.ComponentRepo(d.component); err != nil { + return err + } else if latestSha, err := d.repo.GetLatestCommitHash( + repo.Org, + repo.Name, d.envBranch(d.component, manager.EnvType(os.Getenv(manager.EnvVar_Env))), shaTag, ); err != nil { @@ -205,21 +211,19 @@ func (d deployJob) generateEnvLayout(component manager.DeployComponent) (*manage publicCluster := "ceramic-" + d.env + "-ex" casCluster := "ceramic-" + d.env + "-cas" casV5Cluster := "app-cas-" + d.env - ecrRepo, err := d.componentEcrRepo(component) - if err != nil { - log.Printf("generateEnvLayout: ecr repo error: %s, %v", component, err) - return nil, err - } clusters := []string{privateCluster, publicCluster, casCluster, casV5Cluster} // Populate the service layout by retrieving the clusters/services from ECS if currentLayout, err := d.d.GetLayout(clusters); err != nil { log.Printf("generateEnvLayout: get layout error: %s, %v", component, err) return nil, err + } else if ecrRepo, err := d.componentEcrRepo(component); err != nil { + log.Printf("generateEnvLayout: get ecr repo error: %s, %v", component, err) + return nil, err } else { newLayout := &manager.Layout{Clusters: map[string]*manager.Cluster{}, Repo: ecrRepo} for cluster, clusterLayout := range currentLayout.Clusters { for service, task := range clusterLayout.ServiceTasks.Tasks { - if newTask := d.componentTask(component, cluster, service); newTask != nil { + if newTask := d.componentTask(component, cluster, service, strings.Split(task.Name, ",")); newTask != nil { if newLayout.Clusters[cluster] == nil { // We found at least one matching task, so we can start populating the cluster layout. newLayout.Clusters[cluster] = &manager.Cluster{ServiceTasks: &manager.TaskSet{Tasks: map[string]*manager.Task{}}} @@ -237,7 +241,7 @@ func (d deployJob) generateEnvLayout(component manager.DeployComponent) (*manage if component == manager.DeployComponent_Cas { newLayout.Clusters[casCluster].Tasks = &manager.TaskSet{Tasks: map[string]*manager.Task{ casCluster + "-" + serviceSuffix_CasWorker: { - Repo: anchorWorkerRepo, + Repo: &manager.Repo{Name: anchorWorkerRepo}, Temp: true, // Anchor workers do not stay up permanently Name: containerName_CasWorker, }, @@ -247,7 +251,7 @@ func (d deployJob) generateEnvLayout(component manager.DeployComponent) (*manage } } -func (d deployJob) componentTask(component manager.DeployComponent, cluster, service string) *manager.Task { +func (d deployJob) componentTask(component manager.DeployComponent, cluster, service string, containerNames []string) *manager.Task { // Skip any ELP services (e.g. "ceramic-elp-1-1-node") serviceNameParts := strings.Split(service, "-") if (len(serviceNameParts) >= 2) && (serviceNameParts[1] == serviceSuffix_Elp) { @@ -260,8 +264,7 @@ func (d deployJob) componentTask(component manager.DeployComponent, cluster, ser return &manager.Task{Name: containerName_CeramicNode} } case manager.DeployComponent_Ipfs: - // All clusters have IPFS nodes - if strings.Contains(service, serviceSuffix_IpfsNode) { + if strings.Contains(service, serviceSuffix_IpfsNode) && slices.Contains(containerNames, containerName_IpfsNode) { return &manager.Task{Name: containerName_IpfsNode} } case manager.DeployComponent_Cas: @@ -272,28 +275,38 @@ func (d deployJob) componentTask(component manager.DeployComponent, cluster, ser if (cluster == "app-cas-"+d.env) && strings.Contains(service, serviceSuffix_CasScheduler) { return &manager.Task{Name: containerName_CasV5Scheduler} } + case manager.DeployComponent_RustCeramic: + if strings.Contains(service, serviceSuffix_IpfsNode) && slices.Contains(containerNames, containerName_RustCeramic) { + return &manager.Task{Name: containerName_RustCeramic} + } default: log.Printf("componentTask: unknown component: %s", component) } return nil } -func (d deployJob) componentEcrRepo(component manager.DeployComponent) (string, error) { +func (d deployJob) componentEcrRepo(component manager.DeployComponent) (*manager.Repo, error) { switch component { case manager.DeployComponent_Ceramic: - return "ceramic-prod", nil + return &manager.Repo{Name: "ceramic-prod"}, nil case manager.DeployComponent_Ipfs: - return "go-ipfs-prod", nil + return &manager.Repo{Name: "go-ipfs-prod"}, nil case manager.DeployComponent_Cas: - return "ceramic-prod-cas", nil + return &manager.Repo{Name: "ceramic-prod-cas"}, nil case manager.DeployComponent_CasV5: - return "app-cas-scheduler", nil + return &manager.Repo{Name: "app-cas-scheduler"}, nil + case manager.DeployComponent_RustCeramic: + return &manager.Repo{Name: "ceramic-one", Public: true}, nil default: - return "", fmt.Errorf("componentTask: unknown component: %s", component) + return nil, fmt.Errorf("componentEcrRepo: unknown component: %s", component) } } func (d deployJob) envBranch(component manager.DeployComponent, env manager.EnvType) string { + // All rust-ceramic deploys are currently from the "main" branch + if component == manager.DeployComponent_RustCeramic { + return envBranch_Prod + } switch env { case manager.EnvType_Dev: return envBranch_Dev diff --git a/cd/manager/models.go b/cd/manager/models.go index 2d878bf..8ef9932 100644 --- a/cd/manager/models.go +++ b/cd/manager/models.go @@ -24,19 +24,29 @@ const ( type DeployComponent string const ( - DeployComponent_Ceramic DeployComponent = "ceramic" - DeployComponent_Cas DeployComponent = "cas" - DeployComponent_CasV5 DeployComponent = "casv5" - DeployComponent_Ipfs DeployComponent = "ipfs" + DeployComponent_Ceramic DeployComponent = "ceramic" + DeployComponent_Cas DeployComponent = "cas" + DeployComponent_CasV5 DeployComponent = "casv5" + DeployComponent_Ipfs DeployComponent = "ipfs" + DeployComponent_RustCeramic DeployComponent = "rust-ceramic" ) -type DeployRepo string +type DeployRepo struct { + Org string + Name string +} + +const ( + RepoName_Ceramic = "js-ceramic" + RepoName_Cas = "ceramic-anchor-service" + RepoName_CasV5 = "go-cas" + RepoName_Ipfs = "go-ipfs-daemon" + RepoName_RustCeramic = "rust-ceramic" +) const ( - DeployRepo_Ceramic DeployRepo = "js-ceramic" - DeployRepo_Cas DeployRepo = "ceramic-anchor-service" - DeployRepo_CasV5 DeployRepo = "go-cas" - DeployRepo_Ipfs DeployRepo = "go-ipfs-daemon" + GitHubOrg_CeramicNetwork = "ceramicnetwork" + GitHubOrg_3Box = "3box" ) var ( @@ -44,8 +54,6 @@ var ( Error_CompletionTimeout = fmt.Errorf("completion timeout") ) -const GitHubOrg = "ceramicnetwork" - const ( EnvVar_Env = "ENV" ) @@ -61,27 +69,32 @@ const ( const ServiceName = "cd-manager" -// Layout (as well as Cluster, TaskSet, and Task) are a generic representation of our service structure within an -// orchestration service (e.g. AWS ECS). +// Layout (as well as Cluster, Repo, TaskSet, and Task) are a generic representation of our service structure within +// an orchestration service (e.g. AWS ECS). type Layout struct { Clusters map[string]*Cluster `dynamodbav:"clusters,omitempty"` - Repo string `dynamodbav:"repo,omitempty"` // Layout repo + Repo *Repo `dynamodbav:"repo,omitempty"` // Layout repo +} + +type Repo struct { + Name string `dynamodbav:"name,omitempty"` + Public bool `dynamodbav:"public,omitempty"` } type Cluster struct { ServiceTasks *TaskSet `dynamodbav:"serviceTasks,omitempty"` Tasks *TaskSet `dynamodbav:"tasks,omitempty"` - Repo string `dynamodbav:"repo,omitempty"` // Cluster repo override + Repo *Repo `dynamodbav:"repo,omitempty"` // Cluster repo override } type TaskSet struct { Tasks map[string]*Task `dynamodbav:"tasks,omitempty"` - Repo string `dynamodbav:"repo,omitempty"` // TaskSet repo override + Repo *Repo `dynamodbav:"repo,omitempty"` // TaskSet repo override } type Task struct { Id string `dynamodbav:"id,omitempty"` - Repo string `dynamodbav:"repo,omitempty"` // Task repo override + Repo *Repo `dynamodbav:"repo,omitempty"` // Task repo override Temp bool `dynamodbav:"temp,omitempty"` // Whether the task is meant to go down once it has completed Name string `dynamodbav:"name,omitempty"` // Container name } @@ -154,7 +167,7 @@ type Manager interface { // Repository represents a git service hosting our repositories (e.g. GitHub) type Repository interface { - GetLatestCommitHash(repo DeployRepo, branch, shaTag string) (string, error) + GetLatestCommitHash(org, repo, branch, shaTag string) (string, error) StartWorkflow(Workflow) error FindMatchingWorkflowRun(workflow Workflow, jobId string, searchTime time.Time) (int64, string, error) CheckWorkflowStatus(workflow Workflow, workflowRunId int64) (WorkflowStatus, error) diff --git a/cd/manager/notifs/discord.go b/cd/manager/notifs/discord.go index e9cbf02..ed8f7d1 100644 --- a/cd/manager/notifs/discord.go +++ b/cd/manager/notifs/discord.go @@ -184,8 +184,8 @@ func (n JobNotifs) getDeployHashes(jobState job.JobState) string { func (n JobNotifs) getComponentMsg(component manager.DeployComponent, commitHashes map[manager.DeployComponent]string) string { if commitHash, found := commitHashes[component]; found && (len(commitHash) >= shaTagLength) { - repo := manager.ComponentRepo(component) - return fmt.Sprintf("[%s (%s)](https://github.com/%s/%s/commit/%s)", repo, commitHash[:shaTagLength], manager.GitHubOrg, repo, commitHash) + repo, _ := manager.ComponentRepo(component) + return fmt.Sprintf("[%s (%s)](https://github.com/%s/%s/commit/%s)", repo.Name, commitHash[:shaTagLength], repo.Org, repo.Name, commitHash) } return "" } diff --git a/cd/manager/repository/github.go b/cd/manager/repository/github.go index 2c4c10c..83e698e 100644 --- a/cd/manager/repository/github.go +++ b/cd/manager/repository/github.go @@ -22,6 +22,7 @@ type Github struct { } const ( + github_CommitStatus_Pending string = "pending" github_CommitStatus_Failure string = "failure" github_CommitStatus_Success string = "success" ) @@ -48,11 +49,11 @@ func NewRepository() manager.Repository { return &Github{github.NewClient(httpClient)} } -func (g Github) GetLatestCommitHash(repo manager.DeployRepo, branch, shaTag string) (string, error) { +func (g Github) GetLatestCommitHash(org, repo, branch, shaTag string) (string, error) { ctx, cancel := context.WithTimeout(context.Background(), manager.DefaultHttpWaitTime) defer cancel() - if commits, resp, err := g.client.Repositories.ListCommits(ctx, manager.GitHubOrg, string(repo), &github.CommitsListOptions{ + if commits, resp, err := g.client.Repositories.ListCommits(ctx, org, repo, &github.CommitsListOptions{ SHA: branch, // We want to find the newest commit with all passed status checks so that we don't use a commit that doesn't // already have a corresponding Docker image in ECR, and we might as well request the maximum number of commits. @@ -65,7 +66,7 @@ func (g Github) GetLatestCommitHash(repo manager.DeployRepo, branch, shaTag stri log.Printf("getLatestCommitHash: list commits rate limit=%d, remaining=%d, resetAt=%s", resp.Rate.Limit, resp.Rate.Remaining, resp.Rate.Reset) for _, commit := range commits { sha := *commit.SHA - if checksPassed, err := g.checkRefStatus(repo, sha); err != nil { + if checksPassed, err := g.checkRefStatus(org, repo, sha); err != nil { return "", err } else if checksPassed { // Return the newest commit with passed checks return sha, nil @@ -79,12 +80,12 @@ func (g Github) GetLatestCommitHash(repo manager.DeployRepo, branch, shaTag stri } } -func (g Github) checkRefStatus(repo manager.DeployRepo, ref string) (bool, error) { +func (g Github) checkRefStatus(org, repo, ref string) (bool, error) { getRefStatus := func() (*github.CombinedStatus, error) { ctx, cancel := context.WithTimeout(context.Background(), manager.DefaultHttpWaitTime) defer cancel() - status, resp, err := g.client.Repositories.GetCombinedStatus(ctx, manager.GitHubOrg, string(repo), ref, &github.ListOptions{PerPage: 100}) + status, resp, err := g.client.Repositories.GetCombinedStatus(ctx, org, repo, ref, &github.ListOptions{PerPage: 100}) log.Printf("checkRefStatus: status=%s, rate limit=%d, remaining=%d, resetAt=%s", status, resp.Rate.Limit, resp.Rate.Remaining, resp.Rate.Reset) return status, err } @@ -106,6 +107,13 @@ func (g Github) checkRefStatus(repo manager.DeployRepo, ref string) (bool, error } case github_CommitStatus_Failure: return false, nil + case github_CommitStatus_Pending: + // It's possible there are no statuses yet, and we consider the commit to have passed all checks. This + // shouldn't happen in reality because it is only several minutes after a commit has been made that a + // deployment job created. + if len(status.Statuses) == 0 { + return true, nil + } } } // Sleep for a few seconds so we don't get rate limited diff --git a/cd/manager/utils.go b/cd/manager/utils.go index 3de1123..cc3c133 100644 --- a/cd/manager/utils.go +++ b/cd/manager/utils.go @@ -24,18 +24,20 @@ func PrintJob(jobStates ...job.JobState) string { return prettyString } -func ComponentRepo(component DeployComponent) DeployRepo { +func ComponentRepo(component DeployComponent) (DeployRepo, error) { switch component { case DeployComponent_Ceramic: - return DeployRepo_Ceramic + return DeployRepo{Org: GitHubOrg_CeramicNetwork, Name: RepoName_Ceramic}, nil case DeployComponent_Cas: - return DeployRepo_Cas + return DeployRepo{Org: GitHubOrg_CeramicNetwork, Name: RepoName_Cas}, nil case DeployComponent_CasV5: - return DeployRepo_CasV5 + return DeployRepo{Org: GitHubOrg_CeramicNetwork, Name: RepoName_CasV5}, nil case DeployComponent_Ipfs: - return DeployRepo_Ipfs + return DeployRepo{Org: GitHubOrg_CeramicNetwork, Name: RepoName_Ipfs}, nil + case DeployComponent_RustCeramic: + return DeployRepo{Org: GitHubOrg_3Box, Name: RepoName_RustCeramic}, nil default: - return "" + return DeployRepo{}, fmt.Errorf("componentRepo: unknown component: %s", component) } }