Skip to content

Commit

Permalink
feat(cd): rust-ceramic deployment
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 committed Nov 6, 2023
1 parent 04a76aa commit 78c6e5e
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 62 deletions.
45 changes: 32 additions & 13 deletions cd/manager/common/aws/ecs/ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
)

const resourceTag = "Ceramic"
const publicEcrUri = "public.ecr.aws/r5b3e0r5/3box/"

func NewEcs(cfg aws.Config) manager.Deployment {
ecrUri := os.Getenv("AWS_ACCOUNT_ID") + ".dkr.ecr." + os.Getenv("AWS_REGION") + ".amazonaws.com/"
Expand Down Expand Up @@ -123,12 +124,23 @@ func (e Ecs) GetLayout(clusters []string) (*manager.Layout, error) {
layout.Clusters[cluster] = &manager.Cluster{ServiceTasks: &manager.TaskSet{Tasks: map[string]*manager.Task{}}}
for _, serviceArn := range clusterServices.ServiceArns {
service := e.serviceNameFromArn(serviceArn)
ecsService, err := e.describeEcsService(cluster, service)
if err != nil {
if ecsService, err := e.describeEcsService(cluster, service); err != nil {
log.Printf("getLayout: describe service error: %s, %s, %v", cluster, service, err)
return nil, err
} else {
taskDefArn := *ecsService.Services[0].TaskDefinition
containerDefNames := make([]string, 0, 1)
if taskDef, err := e.getEcsTaskDefinition(taskDefArn); err != nil {
log.Printf("getLayout: get task def error: %s, %s, %s, %v", taskDefArn, cluster, service, err)
return nil, err
} else {
for _, containerDef := range taskDef.ContainerDefinitions {
containerDefNames = append(containerDefNames, *containerDef.Name)
}
}
// Return the names of all the containers associated with this task definition
layout.Clusters[cluster].ServiceTasks.Tasks[service] = &manager.Task{Id: taskDefArn, Name: strings.Join(containerDefNames, ",")}
}
layout.Clusters[cluster].ServiceTasks.Tasks[service] = &manager.Task{Id: *ecsService.Services[0].TaskDefinition}
}
}
}
Expand All @@ -137,9 +149,9 @@ func (e Ecs) GetLayout(clusters []string) (*manager.Layout, error) {

func (e Ecs) UpdateLayout(layout *manager.Layout, commitHash string) error {
for clusterName, cluster := range layout.Clusters {
clusterRepo := layout.Repo
if len(cluster.Repo) > 0 {
clusterRepo = cluster.Repo
clusterRepo := e.getEcrRepo(*layout.Repo) // The main layout repo should never be null
if cluster.Repo != nil {
clusterRepo = e.getEcrRepo(*cluster.Repo)
}
if err := e.updateEnvCluster(cluster, clusterName, clusterRepo, commitHash); err != nil {
return err
Expand Down Expand Up @@ -242,7 +254,7 @@ func (e Ecs) updateEcsTaskDefinition(taskDefArn, image, containerName string) (s

for idx, containerDef := range taskDef.ContainerDefinitions {
if *containerDef.Name == containerName {
taskDef.ContainerDefinitions[idx].Image = aws.String(e.ecrUri + image)
taskDef.ContainerDefinitions[idx].Image = aws.String(image)
regTaskDefInput := &ecs.RegisterTaskDefinitionInput{
ContainerDefinitions: taskDef.ContainerDefinitions,
Family: taskDef.Family,
Expand Down Expand Up @@ -432,8 +444,8 @@ func (e Ecs) updateEnvTaskSet(taskSet *manager.TaskSet, deployType string, clust
if taskSet != nil {
for taskSetName, task := range taskSet.Tasks {
taskSetRepo := clusterRepo
if len(taskSet.Repo) > 0 {
taskSetRepo = taskSet.Repo
if taskSet.Repo != nil {
taskSetRepo = e.getEcrRepo(*taskSet.Repo)
}
switch deployType {
case deployType_Service:
Expand All @@ -454,8 +466,8 @@ func (e Ecs) updateEnvTaskSet(taskSet *manager.TaskSet, deployType string, clust

func (e Ecs) updateEnvServiceTask(task *manager.Task, cluster, service, taskSetRepo, commitHash string) error {
taskRepo := taskSetRepo
if len(task.Repo) > 0 {
taskRepo = task.Repo
if task.Repo != nil {
taskRepo = e.getEcrRepo(*task.Repo)
}
if id, err := e.updateEcsService(cluster, service, taskRepo+":"+commitHash, task.Name, task.Temp); err != nil {
return err
Expand All @@ -467,8 +479,8 @@ func (e Ecs) updateEnvServiceTask(task *manager.Task, cluster, service, taskSetR

func (e Ecs) updateEnvTask(task *manager.Task, cluster, taskName, taskSetRepo, commitHash string) error {
taskRepo := taskSetRepo
if len(task.Repo) > 0 {
taskRepo = task.Repo
if task.Repo != nil {
taskRepo = e.getEcrRepo(*task.Repo)
}
if id, err := e.updateEcsTask(cluster, taskName, taskRepo+":"+commitHash, task.Name, task.Temp); err != nil {
return err
Expand Down Expand Up @@ -547,3 +559,10 @@ func (e Ecs) parseEcsFailures(ecsFailures []types.Failure) []ecsFailure {
}
return failures
}

func (e Ecs) getEcrRepo(repo manager.Repo) string {
if repo.Public {
return publicEcrUri + repo.Name
}
return e.ecrUri + repo.Name
}
49 changes: 31 additions & 18 deletions cd/manager/jobs/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strings"
"time"

"golang.org/x/exp/slices"

"github.com/3box/pipeline-tools/cd/manager"
"github.com/3box/pipeline-tools/cd/manager/common/job"
)
Expand Down Expand Up @@ -47,6 +49,7 @@ const (
containerName_CasApi string = "cas_api"
containerName_CasWorker string = "cas_anchor"
containerName_CasV5Scheduler string = "scheduler"
containerName_RustCeramic string = "rust-ceramic"
)

const defaultFailureTime = 30 * time.Minute
Expand Down Expand Up @@ -146,8 +149,11 @@ func (d deployJob) prepareJob(deployHashes map[manager.DeployComponent]string) e
// The last two cases will only happen when redeploying manually, so we can note that in the notification.
if d.sha == buildHashLatest {
shaTag, _ := d.state.Params[job.DeployJobParam_ShaTag].(string)
if latestSha, err := d.repo.GetLatestCommitHash(
manager.ComponentRepo(d.component),
if repo, err := manager.ComponentRepo(d.component); err != nil {
return err
} else if latestSha, err := d.repo.GetLatestCommitHash(
repo.Org,
repo.Name,
d.envBranch(d.component, manager.EnvType(os.Getenv(manager.EnvVar_Env))),
shaTag,
); err != nil {
Expand Down Expand Up @@ -205,21 +211,19 @@ func (d deployJob) generateEnvLayout(component manager.DeployComponent) (*manage
publicCluster := "ceramic-" + d.env + "-ex"
casCluster := "ceramic-" + d.env + "-cas"
casV5Cluster := "app-cas-" + d.env
ecrRepo, err := d.componentEcrRepo(component)
if err != nil {
log.Printf("generateEnvLayout: ecr repo error: %s, %v", component, err)
return nil, err
}
clusters := []string{privateCluster, publicCluster, casCluster, casV5Cluster}
// Populate the service layout by retrieving the clusters/services from ECS
if currentLayout, err := d.d.GetLayout(clusters); err != nil {
log.Printf("generateEnvLayout: get layout error: %s, %v", component, err)
return nil, err
} else if ecrRepo, err := d.componentEcrRepo(component); err != nil {
log.Printf("generateEnvLayout: get ecr repo error: %s, %v", component, err)
return nil, err
} else {
newLayout := &manager.Layout{Clusters: map[string]*manager.Cluster{}, Repo: ecrRepo}
for cluster, clusterLayout := range currentLayout.Clusters {
for service, task := range clusterLayout.ServiceTasks.Tasks {
if newTask := d.componentTask(component, cluster, service); newTask != nil {
if newTask := d.componentTask(component, cluster, service, strings.Split(task.Name, ",")); newTask != nil {
if newLayout.Clusters[cluster] == nil {
// We found at least one matching task, so we can start populating the cluster layout.
newLayout.Clusters[cluster] = &manager.Cluster{ServiceTasks: &manager.TaskSet{Tasks: map[string]*manager.Task{}}}
Expand All @@ -237,7 +241,7 @@ func (d deployJob) generateEnvLayout(component manager.DeployComponent) (*manage
if component == manager.DeployComponent_Cas {
newLayout.Clusters[casCluster].Tasks = &manager.TaskSet{Tasks: map[string]*manager.Task{
casCluster + "-" + serviceSuffix_CasWorker: {
Repo: anchorWorkerRepo,
Repo: &manager.Repo{Name: anchorWorkerRepo},
Temp: true, // Anchor workers do not stay up permanently
Name: containerName_CasWorker,
},
Expand All @@ -247,7 +251,7 @@ func (d deployJob) generateEnvLayout(component manager.DeployComponent) (*manage
}
}

func (d deployJob) componentTask(component manager.DeployComponent, cluster, service string) *manager.Task {
func (d deployJob) componentTask(component manager.DeployComponent, cluster, service string, containerNames []string) *manager.Task {
// Skip any ELP services (e.g. "ceramic-elp-1-1-node")
serviceNameParts := strings.Split(service, "-")
if (len(serviceNameParts) >= 2) && (serviceNameParts[1] == serviceSuffix_Elp) {
Expand All @@ -260,8 +264,7 @@ func (d deployJob) componentTask(component manager.DeployComponent, cluster, ser
return &manager.Task{Name: containerName_CeramicNode}
}
case manager.DeployComponent_Ipfs:
// All clusters have IPFS nodes
if strings.Contains(service, serviceSuffix_IpfsNode) {
if strings.Contains(service, serviceSuffix_IpfsNode) && slices.Contains(containerNames, containerName_IpfsNode) {
return &manager.Task{Name: containerName_IpfsNode}
}
case manager.DeployComponent_Cas:
Expand All @@ -272,28 +275,38 @@ func (d deployJob) componentTask(component manager.DeployComponent, cluster, ser
if (cluster == "app-cas-"+d.env) && strings.Contains(service, serviceSuffix_CasScheduler) {
return &manager.Task{Name: containerName_CasV5Scheduler}
}
case manager.DeployComponent_RustCeramic:
if strings.Contains(service, serviceSuffix_IpfsNode) && slices.Contains(containerNames, containerName_RustCeramic) {
return &manager.Task{Name: containerName_RustCeramic}
}
default:
log.Printf("componentTask: unknown component: %s", component)
}
return nil
}

func (d deployJob) componentEcrRepo(component manager.DeployComponent) (string, error) {
func (d deployJob) componentEcrRepo(component manager.DeployComponent) (*manager.Repo, error) {
switch component {
case manager.DeployComponent_Ceramic:
return "ceramic-prod", nil
return &manager.Repo{Name: "ceramic-prod"}, nil
case manager.DeployComponent_Ipfs:
return "go-ipfs-prod", nil
return &manager.Repo{Name: "go-ipfs-prod"}, nil
case manager.DeployComponent_Cas:
return "ceramic-prod-cas", nil
return &manager.Repo{Name: "ceramic-prod-cas"}, nil
case manager.DeployComponent_CasV5:
return "app-cas-scheduler", nil
return &manager.Repo{Name: "app-cas-scheduler"}, nil
case manager.DeployComponent_RustCeramic:
return &manager.Repo{Name: "ceramic-one", Public: true}, nil
default:
return "", fmt.Errorf("componentTask: unknown component: %s", component)
return nil, fmt.Errorf("componentEcrRepo: unknown component: %s", component)
}
}

func (d deployJob) envBranch(component manager.DeployComponent, env manager.EnvType) string {
// All rust-ceramic deploys are currently from the "main" branch
if component == manager.DeployComponent_RustCeramic {
return envBranch_Prod
}
switch env {
case manager.EnvType_Dev:
return envBranch_Dev
Expand Down
49 changes: 31 additions & 18 deletions cd/manager/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,36 @@ const (
type DeployComponent string

const (
DeployComponent_Ceramic DeployComponent = "ceramic"
DeployComponent_Cas DeployComponent = "cas"
DeployComponent_CasV5 DeployComponent = "casv5"
DeployComponent_Ipfs DeployComponent = "ipfs"
DeployComponent_Ceramic DeployComponent = "ceramic"
DeployComponent_Cas DeployComponent = "cas"
DeployComponent_CasV5 DeployComponent = "casv5"
DeployComponent_Ipfs DeployComponent = "ipfs"
DeployComponent_RustCeramic DeployComponent = "rust-ceramic"
)

type DeployRepo string
type DeployRepo struct {
Org string
Name string
}

const (
RepoName_Ceramic = "js-ceramic"
RepoName_Cas = "ceramic-anchor-service"
RepoName_CasV5 = "go-cas"
RepoName_Ipfs = "go-ipfs-daemon"
RepoName_RustCeramic = "rust-ceramic"
)

const (
DeployRepo_Ceramic DeployRepo = "js-ceramic"
DeployRepo_Cas DeployRepo = "ceramic-anchor-service"
DeployRepo_CasV5 DeployRepo = "go-cas"
DeployRepo_Ipfs DeployRepo = "go-ipfs-daemon"
GitHubOrg_CeramicNetwork = "ceramicnetwork"
GitHubOrg_3Box = "3box"
)

var (
Error_StartupTimeout = fmt.Errorf("startup timeout")
Error_CompletionTimeout = fmt.Errorf("completion timeout")
)

const GitHubOrg = "ceramicnetwork"

const (
EnvVar_Env = "ENV"
)
Expand All @@ -61,27 +69,32 @@ const (

const ServiceName = "cd-manager"

// Layout (as well as Cluster, TaskSet, and Task) are a generic representation of our service structure within an
// orchestration service (e.g. AWS ECS).
// Layout (as well as Cluster, Repo, TaskSet, and Task) are a generic representation of our service structure within
// an orchestration service (e.g. AWS ECS).
type Layout struct {
Clusters map[string]*Cluster `dynamodbav:"clusters,omitempty"`
Repo string `dynamodbav:"repo,omitempty"` // Layout repo
Repo *Repo `dynamodbav:"repo,omitempty"` // Layout repo
}

type Repo struct {
Name string `dynamodbav:"name,omitempty"`
Public bool `dynamodbav:"public,omitempty"`
}

type Cluster struct {
ServiceTasks *TaskSet `dynamodbav:"serviceTasks,omitempty"`
Tasks *TaskSet `dynamodbav:"tasks,omitempty"`
Repo string `dynamodbav:"repo,omitempty"` // Cluster repo override
Repo *Repo `dynamodbav:"repo,omitempty"` // Cluster repo override
}

type TaskSet struct {
Tasks map[string]*Task `dynamodbav:"tasks,omitempty"`
Repo string `dynamodbav:"repo,omitempty"` // TaskSet repo override
Repo *Repo `dynamodbav:"repo,omitempty"` // TaskSet repo override
}

type Task struct {
Id string `dynamodbav:"id,omitempty"`
Repo string `dynamodbav:"repo,omitempty"` // Task repo override
Repo *Repo `dynamodbav:"repo,omitempty"` // Task repo override
Temp bool `dynamodbav:"temp,omitempty"` // Whether the task is meant to go down once it has completed
Name string `dynamodbav:"name,omitempty"` // Container name
}
Expand Down Expand Up @@ -154,7 +167,7 @@ type Manager interface {

// Repository represents a git service hosting our repositories (e.g. GitHub)
type Repository interface {
GetLatestCommitHash(repo DeployRepo, branch, shaTag string) (string, error)
GetLatestCommitHash(org, repo, branch, shaTag string) (string, error)
StartWorkflow(Workflow) error
FindMatchingWorkflowRun(workflow Workflow, jobId string, searchTime time.Time) (int64, string, error)
CheckWorkflowStatus(workflow Workflow, workflowRunId int64) (WorkflowStatus, error)
Expand Down
4 changes: 2 additions & 2 deletions cd/manager/notifs/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ func (n JobNotifs) getDeployHashes(jobState job.JobState) string {

func (n JobNotifs) getComponentMsg(component manager.DeployComponent, commitHashes map[manager.DeployComponent]string) string {
if commitHash, found := commitHashes[component]; found && (len(commitHash) >= shaTagLength) {
repo := manager.ComponentRepo(component)
return fmt.Sprintf("[%s (%s)](https://github.com/%s/%s/commit/%s)", repo, commitHash[:shaTagLength], manager.GitHubOrg, repo, commitHash)
repo, _ := manager.ComponentRepo(component)
return fmt.Sprintf("[%s (%s)](https://github.com/%s/%s/commit/%s)", repo.Name, commitHash[:shaTagLength], repo.Org, repo.Name, commitHash)
}
return ""
}
Expand Down
18 changes: 13 additions & 5 deletions cd/manager/repository/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Github struct {
}

const (
github_CommitStatus_Pending string = "pending"
github_CommitStatus_Failure string = "failure"
github_CommitStatus_Success string = "success"
)
Expand All @@ -48,11 +49,11 @@ func NewRepository() manager.Repository {
return &Github{github.NewClient(httpClient)}
}

func (g Github) GetLatestCommitHash(repo manager.DeployRepo, branch, shaTag string) (string, error) {
func (g Github) GetLatestCommitHash(org, repo, branch, shaTag string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), manager.DefaultHttpWaitTime)
defer cancel()

if commits, resp, err := g.client.Repositories.ListCommits(ctx, manager.GitHubOrg, string(repo), &github.CommitsListOptions{
if commits, resp, err := g.client.Repositories.ListCommits(ctx, org, repo, &github.CommitsListOptions{
SHA: branch,
// We want to find the newest commit with all passed status checks so that we don't use a commit that doesn't
// already have a corresponding Docker image in ECR, and we might as well request the maximum number of commits.
Expand All @@ -65,7 +66,7 @@ func (g Github) GetLatestCommitHash(repo manager.DeployRepo, branch, shaTag stri
log.Printf("getLatestCommitHash: list commits rate limit=%d, remaining=%d, resetAt=%s", resp.Rate.Limit, resp.Rate.Remaining, resp.Rate.Reset)
for _, commit := range commits {
sha := *commit.SHA
if checksPassed, err := g.checkRefStatus(repo, sha); err != nil {
if checksPassed, err := g.checkRefStatus(org, repo, sha); err != nil {
return "", err
} else if checksPassed { // Return the newest commit with passed checks
return sha, nil
Expand All @@ -79,12 +80,12 @@ func (g Github) GetLatestCommitHash(repo manager.DeployRepo, branch, shaTag stri
}
}

func (g Github) checkRefStatus(repo manager.DeployRepo, ref string) (bool, error) {
func (g Github) checkRefStatus(org, repo, ref string) (bool, error) {
getRefStatus := func() (*github.CombinedStatus, error) {
ctx, cancel := context.WithTimeout(context.Background(), manager.DefaultHttpWaitTime)
defer cancel()

status, resp, err := g.client.Repositories.GetCombinedStatus(ctx, manager.GitHubOrg, string(repo), ref, &github.ListOptions{PerPage: 100})
status, resp, err := g.client.Repositories.GetCombinedStatus(ctx, org, repo, ref, &github.ListOptions{PerPage: 100})
log.Printf("checkRefStatus: status=%s, rate limit=%d, remaining=%d, resetAt=%s", status, resp.Rate.Limit, resp.Rate.Remaining, resp.Rate.Reset)
return status, err
}
Expand All @@ -106,6 +107,13 @@ func (g Github) checkRefStatus(repo manager.DeployRepo, ref string) (bool, error
}
case github_CommitStatus_Failure:
return false, nil
case github_CommitStatus_Pending:
// It's possible there are no statuses yet, and we consider the commit to have passed all checks. This
// shouldn't happen in reality because it is only several minutes after a commit has been made that a
// deployment job created.
if len(status.Statuses) == 0 {
return true, nil
}
}
}
// Sleep for a few seconds so we don't get rate limited
Expand Down
Loading

0 comments on commit 78c6e5e

Please sign in to comment.