Skip to content

Implement TTL for AsyncAPI workloads #2151

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
May 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions async-gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ func main() {
}()

var (
port = flag.String("port", _defaultPort, "port on which the gateway server runs on")
queueURL = flag.String("queue", "", "SQS queue URL")
region = flag.String("region", "", "AWS region")
bucket = flag.String("bucket", "", "AWS bucket")
clusterName = flag.String("cluster", "", "cluster name")
port = flag.String("port", _defaultPort, "port on which the gateway server runs on")
queueURL = flag.String("queue", "", "SQS queue URL")
region = flag.String("region", "", "AWS region")
bucket = flag.String("bucket", "", "AWS bucket")
clusterUID = flag.String("cluster-uid", "", "cluster UID")
)
flag.Parse()

Expand All @@ -93,8 +93,8 @@ func main() {
log.Fatal("missing required option: -region")
case *bucket == "":
log.Fatal("missing required option: -bucket")
case *clusterName == "":
log.Fatal("missing required option: -cluster")
case *clusterUID == "":
log.Fatal("missing required option: -cluster-uid")
}

apiName := flag.Arg(0)
Expand All @@ -115,7 +115,7 @@ func main() {
s3Storage := NewS3(sess, *bucket)
sqsQueue := NewSQS(*queueURL, sess)

svc := NewService(*clusterName, apiName, sqsQueue, s3Storage, log)
svc := NewService(*clusterUID, apiName, sqsQueue, s3Storage, log)
ep := NewEndpoint(svc, log)

router := mux.NewRouter()
Expand Down
24 changes: 12 additions & 12 deletions async-gateway/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,21 @@ type Service interface {
}

type service struct {
logger *zap.Logger
queue Queue
storage Storage
clusterName string
apiName string
logger *zap.Logger
queue Queue
storage Storage
clusterUID string
apiName string
}

// NewService creates a new async-gateway service
func NewService(clusterName, apiName string, queue Queue, storage Storage, logger *zap.Logger) Service {
func NewService(clusterUID, apiName string, queue Queue, storage Storage, logger *zap.Logger) Service {
return &service{
logger: logger,
queue: queue,
storage: storage,
clusterName: clusterName,
apiName: apiName,
logger: logger,
queue: queue,
storage: storage,
clusterUID: clusterUID,
apiName: apiName,
}
}

Expand Down Expand Up @@ -151,5 +151,5 @@ func (s *service) getStatus(id string) (Status, error) {
}

func (s *service) workloadStoragePrefix() string {
return fmt.Sprintf("%s/apis/%s/workloads", s.clusterName, s.apiName)
return fmt.Sprintf("%s/workloads/%s", s.clusterUID, s.apiName)
}
90 changes: 89 additions & 1 deletion cli/cmd/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ import (
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/elbv2"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/cortexlabs/cortex/cli/cluster"
"github.com/cortexlabs/cortex/cli/types/cliconfig"
"github.com/cortexlabs/cortex/cli/types/flags"
"github.com/cortexlabs/cortex/pkg/consts"
"github.com/cortexlabs/cortex/pkg/lib/archive"
"github.com/cortexlabs/cortex/pkg/lib/aws"
"github.com/cortexlabs/cortex/pkg/lib/console"
Expand All @@ -45,6 +47,7 @@ import (
s "github.com/cortexlabs/cortex/pkg/lib/strings"
"github.com/cortexlabs/cortex/pkg/lib/table"
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
libtime "github.com/cortexlabs/cortex/pkg/lib/time"
"github.com/cortexlabs/cortex/pkg/operator/schema"
"github.com/cortexlabs/cortex/pkg/types/clusterconfig"
"github.com/cortexlabs/cortex/pkg/types/clusterstate"
Expand Down Expand Up @@ -187,6 +190,11 @@ var _clusterUpCmd = &cobra.Command{
exit.Error(err)
}

err = setLifecycleRulesOnClusterUp(awsClient, clusterConfig.Bucket, clusterConfig.ClusterUID)
if err != nil {
exit.Error(err)
}

err = createLogGroupIfNotFound(awsClient, clusterConfig.ClusterName, clusterConfig.Tags)
if err != nil {
exit.Error(err)
Expand Down Expand Up @@ -426,6 +434,7 @@ var _clusterDownCmd = &cobra.Command{
if err != nil {
exit.Error(err)
}
bucketName := clusterconfig.BucketName(accountID, accessConfig.ClusterName, accessConfig.Region)

warnIfNotAdmin(awsClient)

Expand Down Expand Up @@ -480,11 +489,24 @@ var _clusterDownCmd = &cobra.Command{
fmt.Println()
} else if exitCode == nil || *exitCode != 0 {
out = filterEKSCTLOutput(out)
helpStr := fmt.Sprintf("\nNote: if this error cannot be resolved, please ensure that all CloudFormation stacks for this cluster eventually become fully deleted (%s). If the stack deletion process has failed, please delete the stacks directly from the AWS console (this may require manually deleting particular AWS resources that are blocking the stack deletion)", clusterstate.CloudFormationURL(accessConfig.ClusterName, accessConfig.Region))
template := "\nNote: if this error cannot be resolved, please ensure that all CloudFormation stacks for this cluster eventually become fully deleted (%s)."
template += " If the stack deletion process has failed, please delete the stacks directly from the AWS console (this may require manually deleting particular AWS resources that are blocking the stack deletion)."
template += " In addition to deleting the stacks manually from the AWS console, also make sure to empty and remove the %s bucket"
helpStr := fmt.Sprintf(template, clusterstate.CloudFormationURL(accessConfig.ClusterName, accessConfig.Region), bucketName)
fmt.Println(helpStr)
exit.Error(ErrorClusterDown(out + helpStr))
}

// set lifecycle policy to clean the bucket
fmt.Printf("○ setting lifecycle policy to empty the %s bucket ", bucketName)
err = setLifecycleRulesOnClusterDown(awsClient, bucketName)
if err != nil {
fmt.Printf("\n\nfailed to set lifecycle policy to empty the %s bucket; you can remove the bucket manually via the s3 console: https://s3.console.aws.amazon.com/s3/management/%s\n", bucketName, bucketName)
errors.PrintError(err)
fmt.Println()
}
fmt.Println("✓")

// delete policy after spinning down the cluster (which deletes the roles) because policies can't be deleted if they are attached to roles
policyARN := clusterconfig.DefaultPolicyARN(accountID, accessConfig.ClusterName, accessConfig.Region)
fmt.Printf("○ deleting auto-generated iam policy %s ", policyARN)
Expand Down Expand Up @@ -549,6 +571,7 @@ var _clusterDownCmd = &cobra.Command{
}

fmt.Printf("\nplease check CloudFormation to ensure that all resources for the %s cluster eventually become successfully deleted: %s\n", accessConfig.ClusterName, clusterstate.CloudFormationURL(accessConfig.ClusterName, accessConfig.Region))
fmt.Printf("\na lifecycle rule has been applied to the cluster’s %s bucket to empty its contents later today; you can delete the %s bucket via the s3 console once it has been emptied: https://s3.console.aws.amazon.com/s3/management/%s\n", bucketName, bucketName, bucketName)

cachedClusterConfigPath := cachedClusterConfigPath(accessConfig.ClusterName, accessConfig.Region)
os.Remove(cachedClusterConfigPath)
Expand Down Expand Up @@ -1119,6 +1142,71 @@ func createS3BucketIfNotFound(awsClient *aws.Client, bucket string, tags map[str
return err
}

func setLifecycleRulesOnClusterUp(awsClient *aws.Client, bucket, newClusterUID string) error {
err := awsClient.DeleteLifecycleRules(bucket)
if err != nil {
return err
}

clusterUIDs, err := awsClient.ListS3TopLevelDirs(bucket)
if err != nil {
return err
}

if len(clusterUIDs)+1 > consts.MaxBucketLifecycleRules {
return ErrorClusterUIDsLimitInBucket(bucket)
}

expirationDate := libtime.GetCurrentUTCDate().Add(-24 * time.Hour)
rules := []s3.LifecycleRule{}
for _, clusterUID := range clusterUIDs {
rules = append(rules, s3.LifecycleRule{
Expiration: &s3.LifecycleExpiration{
Date: &expirationDate,
},
ID: pointer.String("cluster-remove-" + clusterUID),
Filter: &s3.LifecycleRuleFilter{
Prefix: pointer.String(s.EnsureSuffix(clusterUID, "/")),
},
Status: pointer.String("Enabled"),
})
}

rules = append(rules, s3.LifecycleRule{
Expiration: &s3.LifecycleExpiration{
Days: pointer.Int64(consts.AsyncWorkloadsExpirationDays),
},
ID: pointer.String("async-workloads-expiry-policy"),
Filter: &s3.LifecycleRuleFilter{
Prefix: pointer.String(s.EnsureSuffix(filepath.Join(newClusterUID, "workloads"), "/")),
},
Status: pointer.String("Enabled"),
})

return awsClient.SetLifecycleRules(bucket, rules)
}

func setLifecycleRulesOnClusterDown(awsClient *aws.Client, bucket string) error {
err := awsClient.DeleteLifecycleRules(bucket)
if err != nil {
return err
}

expirationDate := libtime.GetCurrentUTCDate().Add(-24 * time.Hour)
return awsClient.SetLifecycleRules(bucket, []s3.LifecycleRule{
{
Expiration: &s3.LifecycleExpiration{
Date: &expirationDate,
},
ID: pointer.String("bucket-cleaner"),
Filter: &s3.LifecycleRuleFilter{
Prefix: pointer.String(""),
},
Status: pointer.String("Enabled"),
},
})
}

func createLogGroupIfNotFound(awsClient *aws.Client, logGroup string, tags map[string]string) error {
logGroupFound, err := awsClient.DoesLogGroupExist(logGroup)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions cli/cmd/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const (
ErrAPINameMustBeProvided = "cli.api_name_must_be_provided"
ErrAPINotFoundInConfig = "cli.api_not_found_in_config"
ErrNotSupportedForKindAndType = "cli.not_supported_for_kind_and_type"
ErrClusterUIDsLimitInBucket = "cli.cluster_uids_limit_in_bucket"
)

func ErrorInvalidProvider(providerStr, cliConfigPath string) error {
Expand Down Expand Up @@ -296,3 +297,10 @@ func ErrorNotSupportedForKindAndType(kind userconfig.Kind, handlerType userconfi
},
})
}

func ErrorClusterUIDsLimitInBucket(bucket string) error {
return errors.WithStack(&errors.Error{
Kind: ErrClusterUIDsLimitInBucket,
Message: fmt.Sprintf("detected too many top level folders in %s bucket; please empty your bucket and try again", bucket),
})
}
20 changes: 10 additions & 10 deletions enqueuer/enqueuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ const (
)

type EnvConfig struct {
ClusterName string
Region string
Version string
Bucket string
APIName string
JobID string
ClusterUID string
Region string
Version string
Bucket string
APIName string
JobID string
}

// FIXME: all these types should be shared with the cortex webserver (from where the payload is submitted)
Expand Down Expand Up @@ -159,13 +159,13 @@ func (e *Enqueuer) Enqueue() (int, error) {
}

func (e *Enqueuer) UploadBatchCount(batchCount int) error {
key := spec.JobBatchCountKey(e.envConfig.ClusterName, userconfig.BatchAPIKind, e.envConfig.APIName, e.envConfig.JobID)
key := spec.JobBatchCountKey(e.envConfig.ClusterUID, userconfig.BatchAPIKind, e.envConfig.APIName, e.envConfig.JobID)
return e.aws.UploadStringToS3(s.Int(batchCount), e.envConfig.Bucket, key)
}

func (e *Enqueuer) getJobPayload() (JobSubmission, error) {
// e.g. <cluster name>/jobs/<job_api_kind>/<cortex version>/<api_name>/<job_id>
key := spec.JobPayloadKey(e.envConfig.ClusterName, userconfig.BatchAPIKind, e.envConfig.APIName, e.envConfig.JobID)
// e.g. <cluster uid>/jobs/<job_api_kind>/<cortex version>/<api_name>/<job_id>
key := spec.JobPayloadKey(e.envConfig.ClusterUID, userconfig.BatchAPIKind, e.envConfig.APIName, e.envConfig.JobID)

submissionBytes, err := e.aws.ReadBytesFromS3(e.envConfig.Bucket, key)
if err != nil {
Expand All @@ -181,7 +181,7 @@ func (e *Enqueuer) getJobPayload() (JobSubmission, error) {
}

func (e *Enqueuer) deleteJobPayload() error {
key := spec.JobPayloadKey(e.envConfig.ClusterName, userconfig.BatchAPIKind, e.envConfig.APIName, e.envConfig.JobID)
key := spec.JobPayloadKey(e.envConfig.ClusterUID, userconfig.BatchAPIKind, e.envConfig.APIName, e.envConfig.JobID)
if err := e.aws.DeleteS3File(e.envConfig.Bucket, key); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion enqueuer/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func s3IteratorFromLister(awsClient *awslib.Client, s3Lister S3Lister, fn func(s
return err
}

err = awsClientForBucket.S3Iterator(bucket, key, false, nil, func(s3Obj *s3.Object) (bool, error) {
err = awsClientForBucket.S3Iterator(bucket, key, false, nil, nil, func(s3Obj *s3.Object) (bool, error) {
s3FilePath := awslib.S3Path(bucket, *s3Obj.Key)

shouldSkip := false
Expand Down
30 changes: 15 additions & 15 deletions enqueuer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ func createLogger() (*zap.Logger, error) {

func main() {
var (
clusterName string
region string
bucket string
queueURL string
apiName string
jobID string
clusterUID string
region string
bucket string
queueURL string
apiName string
jobID string
)
flag.StringVar(&clusterName, "cluster", os.Getenv("CORTEX_CLUSTER_NAME"), "cluster name (can be set throught the CORTEX_CLUSTER_NAME env variable)")
flag.StringVar(&clusterUID, "cluster-uid", os.Getenv("CORTEX_CLUSTER_UID"), "cluster UID (can be set throught the CORTEX_CLUSTER_UID env variable)")
flag.StringVar(&region, "region", os.Getenv("CORTEX_REGION"), "cluster region (can be set throught the CORTEX_REGION env variable)")
flag.StringVar(&bucket, "bucket", os.Getenv("CORTEX_BUCKET"), "cortex S3 bucket (can be set throught the CORTEX_BUCKET env variable)")
flag.StringVar(&queueURL, "queue", "", "target queue URL to where the api messages will be enqueued")
Expand All @@ -91,8 +91,8 @@ func main() {
}()

switch {
case clusterName == "":
log.Fatal("-cluster is a required option")
case clusterUID == "":
log.Fatal("-cluster-uid is a required option")
case region == "":
log.Fatal("-region is a required option")
case bucket == "":
Expand All @@ -106,12 +106,12 @@ func main() {
}

envConfig := EnvConfig{
ClusterName: clusterName,
Region: region,
Version: version,
Bucket: bucket,
APIName: apiName,
JobID: jobID,
ClusterUID: clusterUID,
Region: region,
Version: version,
Bucket: bucket,
APIName: apiName,
JobID: jobID,
}

enqueuer, err := NewEnqueuer(envConfig, queueURL, log)
Expand Down
21 changes: 16 additions & 5 deletions manager/render_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@
import pathlib
from jinja2 import Environment, FileSystemLoader

# python render_template.py [CLUSTER_CONFIG_PATH] TEMPLATE_PATH
if __name__ == "__main__":
cluster_config_path = sys.argv[1]
template_path = pathlib.Path(sys.argv[2])
if len(sys.argv) == 3:
yaml_file_path = sys.argv[1]
template_path = pathlib.Path(sys.argv[2])
elif len(sys.argv) == 2:
yaml_file_path = None
template_path = pathlib.Path(sys.argv[1])
else:
raise RuntimeError(f"incorrect number of parameters ({len(sys.argv)})")

file_loader = FileSystemLoader(str(template_path.parent))
env = Environment(loader=file_loader)
Expand All @@ -29,6 +36,10 @@
env.rstrip_blocks = True

template = env.get_template(str(template_path.name))
with open(cluster_config_path, "r") as f:
cluster_config = yaml.safe_load(f)
print(template.render(config=cluster_config, env=os.environ))

if yaml_file_path:
with open(yaml_file_path, "r") as f:
yaml_data = yaml.safe_load(f)
print(template.render(config=yaml_data, env=os.environ))
else:
print(template.render(env=os.environ))
3 changes: 3 additions & 0 deletions pkg/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ var (
DefaultMaxReplicaConcurrency = int64(1024)
NeuronCoresPerInf = int64(4)
AuthHeader = "X-Cortex-Authorization"

MaxBucketLifecycleRules = 100
AsyncWorkloadsExpirationDays = int64(7)
)

func DefaultRegistry() string {
Expand Down
Loading