Skip to content

Commit

Permalink
Merge pull request #878 from remind101/stabilize
Browse files Browse the repository at this point in the history
Wait for newly created services to stabilize.
  • Loading branch information
ejholmes authored Jun 16, 2016
2 parents 204cff5 + 68bb6cf commit 2286595
Show file tree
Hide file tree
Showing 14 changed files with 443 additions and 201 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Changelog

## HEAD

**Features**

**Bugs**

* The Custom::ECSService custom resource now waits for newly created ECS services to stabilize [#878](https://github.com/remind101/empire/pull/878)

**Performance**

**Security**

## 0.10.1 (2016-06-14)

**Features**
Expand Down
150 changes: 114 additions & 36 deletions server/cloudformation/cloudformation.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,80 @@ import (
"github.com/remind101/pkg/reporter"
)

var (
// Allow custom resource provisioners this amount of time do their
// thing.
ProvisioningTimeout = time.Duration(20 * time.Minute)

// And this amount of time to cleanup when they're canceled.
ProvisioningGraceTimeout = time.Duration(1 * time.Minute)
)

// Provisioner is something that can provision custom resources.
type Provisioner interface {
// Provision should do the appropriate provisioning, then return:
//
// 1. The physical id that was created, if any.
// 2. The data to return.
Provision(Request) (string, interface{}, error)
Provision(context.Context, Request) (string, interface{}, error)

// Properties should return an instance of a type that the properties
// can be json.Unmarshalled into.
Properties() interface{}
}

type ProvisionerFunc func(context.Context, Request) (string, interface{}, error)

func (fn ProvisionerFunc) Provision(ctx context.Context, r Request) (string, interface{}, error) {
return fn(ctx, r)
}

// withTimeout wraps a Provisioner with a context.WithTimeout.
func withTimeout(p Provisioner, timeout time.Duration, grace time.Duration) Provisioner {

return &timeoutProvisioner{
Provisioner: p,
timeout: timeout,
grace: grace,
}
}

type result struct {
id string
data interface{}
err error
}

type timeoutProvisioner struct {
Provisioner
timeout time.Duration
grace time.Duration
}

func (p *timeoutProvisioner) Provision(ctx context.Context, r Request) (string, interface{}, error) {
ctx, cancel := context.WithTimeout(ctx, p.timeout)
defer cancel()

done := make(chan result)
go func() {
id, data, err := p.Provisioner.Provision(ctx, r)
done <- result{id, data, err}
}()

select {
case r := <-done:
return r.id, r.data, r.err
case <-ctx.Done():
// When the context is canceled, give the provisioner
// some extra time to cleanup.
<-time.After(p.grace)
select {
case r := <-done:
return r.id, r.data, r.err
default:
return "", nil, ctx.Err()
}
}
}

// Possible request types.
Expand Down Expand Up @@ -189,19 +256,29 @@ type CustomResourceProvisioner struct {
// NewCustomResourceProvisioner returns a new CustomResourceProvisioner with an
// sqs client configured from config.
func NewCustomResourceProvisioner(db *sql.DB, config client.ConfigProvider) *CustomResourceProvisioner {
return &CustomResourceProvisioner{
Provisioners: map[string]Provisioner{
"Custom::InstancePort": &InstancePortsProvisioner{
ports: lb.NewDBPortAllocator(db),
},
"Custom::ECSService": &ECSServiceResource{
ecs: ecs.New(config),
postfix: postfix,
},
},
client: http.DefaultClient,
sqs: sqs.New(config),
p := &CustomResourceProvisioner{
Provisioners: make(map[string]Provisioner),
client: http.DefaultClient,
sqs: sqs.New(config),
}

p.add("Custom::InstancePort", &InstancePortsProvisioner{
ports: lb.NewDBPortAllocator(db),
})

p.add("Custom::ECSService", &ECSServiceResource{
ecs: ecs.New(config),
postfix: postfix,
})

return p
}

// add adds a custom resource provisioner.
func (c *CustomResourceProvisioner) add(resourceName string, p Provisioner) {
// Wrap the provisioner with timeouts.
p = withTimeout(p, ProvisioningTimeout, ProvisioningGraceTimeout)
c.Provisioners[resourceName] = p
}

// Start starts pulling requests from the queue and provisioning them.
Expand All @@ -220,10 +297,11 @@ func (c *CustomResourceProvisioner) Start() {
}

for _, m := range resp.Messages {
if err := c.handle(ctx, m); err != nil {
reporter.Report(ctx, err)
continue
}
go func(m *sqs.Message) {
if err := c.handle(ctx, m); err != nil {
reporter.Report(ctx, err)
}
}(m)
}
}
}
Expand Down Expand Up @@ -254,26 +332,8 @@ func (c *CustomResourceProvisioner) Handle(ctx context.Context, message *sqs.Mes
return fmt.Errorf("error unmarshalling to cloudformation request: %v", err)
}

p, ok := c.Provisioners[req.ResourceType]
if !ok {
return fmt.Errorf("no provisioner for %v", req.ResourceType)
}

// If the provisioner defines a type for the properties, let's unmarhsal
// into that Go type.
if p, ok := p.(interface {
Properties() interface{}
}); ok {
req.ResourceProperties = p.Properties()
req.OldResourceProperties = p.Properties()
err = json.Unmarshal([]byte(m.Message), &req)
if err != nil {
return fmt.Errorf("error unmarshalling to cloudformation request: %v", err)
}
}

resp := NewResponseFromRequest(req)
resp.PhysicalResourceId, resp.Data, err = p.Provision(req)
resp.PhysicalResourceId, resp.Data, err = c.provision(ctx, m, req)
switch err {
case nil:
resp.Status = StatusSuccess
Expand Down Expand Up @@ -315,6 +375,24 @@ func (c *CustomResourceProvisioner) Handle(ctx context.Context, message *sqs.Mes
return nil
}

func (c *CustomResourceProvisioner) provision(ctx context.Context, m Message, req Request) (string, interface{}, error) {
p, ok := c.Provisioners[req.ResourceType]
if !ok {
return "", nil, fmt.Errorf("no provisioner for %v", req.ResourceType)
}

// If the provisioner defines a type for the properties, let's unmarhsal
// into that Go type.
req.ResourceProperties = p.Properties()
req.OldResourceProperties = p.Properties()
err := json.Unmarshal([]byte(m.Message), &req)
if err != nil {
return "", nil, fmt.Errorf("error unmarshalling to cloudformation request: %v", err)
}

return p.Provision(ctx, req)
}

// IntValue defines an int64 type that can parse integers as strings from json.
// It's common to use `Ref`'s inside templates, which means the value of some
// properties could be a string or an integer.
Expand Down
43 changes: 42 additions & 1 deletion server/cloudformation/cloudformation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io/ioutil"
"net/http"
"testing"
"time"

"golang.org/x/net/context"

Expand All @@ -15,6 +16,8 @@ import (
"github.com/stretchr/testify/mock"
)

var ctx = context.Background()

func TestCustomResourceProvisioner_Handle(t *testing.T) {
p := new(mockProvisioner)
h := new(mockHTTPClient)
Expand Down Expand Up @@ -92,15 +95,53 @@ func TestIntValue(t *testing.T) {
}
}

func TestWithTimeout_NoTimeout(t *testing.T) {
m := new(mockProvisioner)
p := withTimeout(m, time.Second, time.Second)

m.On("Provision", Request{}).Return("id", nil, nil)

p.Provision(ctx, Request{})
}

func TestWithTimeout_Timeout_Cleanup(t *testing.T) {
m := new(mockProvisioner)
p := withTimeout(m, time.Millisecond*500, time.Millisecond*500)

m.On("Provision", Request{}).Return("id", nil, nil).Run(func(mock.Arguments) {
time.Sleep(time.Millisecond * 750)
})

id, _, err := p.Provision(ctx, Request{})
assert.NoError(t, err)
assert.Equal(t, "id", id)
}

func TestWithTimeout_GraceTimeout(t *testing.T) {
m := new(mockProvisioner)
p := withTimeout(m, time.Millisecond*500, time.Millisecond*500)

m.On("Provision", Request{}).Return("id", nil, nil).Run(func(mock.Arguments) {
time.Sleep(time.Millisecond * 1500)
})

_, _, err := p.Provision(ctx, Request{})
assert.Equal(t, context.DeadlineExceeded, err)
}

type mockProvisioner struct {
mock.Mock
}

func (m *mockProvisioner) Provision(req Request) (string, interface{}, error) {
func (m *mockProvisioner) Provision(_ context.Context, req Request) (string, interface{}, error) {
args := m.Called(req)
return args.String(0), args.Get(1), args.Error(2)
}

func (m *mockProvisioner) Properties() interface{} {
return nil
}

type mockHTTPClient struct {
mock.Mock
}
Expand Down
40 changes: 33 additions & 7 deletions server/cloudformation/ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@ import (
"reflect"
"strings"

"golang.org/x/net/context"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/ecs"
"github.com/remind101/pkg/reporter"
)

type ecsClient interface {
CreateService(*ecs.CreateServiceInput) (*ecs.CreateServiceOutput, error)
DeleteService(*ecs.DeleteServiceInput) (*ecs.DeleteServiceOutput, error)
UpdateService(*ecs.UpdateServiceInput) (*ecs.UpdateServiceOutput, error)
WaitUntilServicesStable(*ecs.DescribeServicesInput) error
}

type LoadBalancer struct {
Expand Down Expand Up @@ -47,17 +51,17 @@ func (p *ECSServiceResource) Properties() interface{} {
return &ECSServiceProperties{}
}

func (p *ECSServiceResource) Provision(req Request) (string, interface{}, error) {
func (p *ECSServiceResource) Provision(ctx context.Context, req Request) (string, interface{}, error) {
properties := req.ResourceProperties.(*ECSServiceProperties)
oldProperties := req.OldResourceProperties.(*ECSServiceProperties)

switch req.RequestType {
case Create:
id, err := p.create(properties)
id, err := p.create(ctx, properties)
return id, nil, err
case Delete:
id := req.PhysicalResourceId
err := p.delete(aws.String(id), properties.Cluster)
err := p.delete(ctx, aws.String(id), properties.Cluster)
return id, nil, err
case Update:
id := req.PhysicalResourceId
Expand All @@ -66,7 +70,7 @@ func (p *ECSServiceResource) Provision(req Request) (string, interface{}, error)
// If we can't update the service, we'll need to create a new
// one, and destroy the old one.
oldId := id
id, err := p.create(properties)
id, err := p.create(ctx, properties)
if err != nil {
return oldId, nil, err
}
Expand All @@ -90,7 +94,7 @@ func (p *ECSServiceResource) Provision(req Request) (string, interface{}, error)
}
}

func (p *ECSServiceResource) create(properties *ECSServiceProperties) (string, error) {
func (p *ECSServiceResource) create(ctx context.Context, properties *ECSServiceProperties) (string, error) {
var loadBalancers []*ecs.LoadBalancer
for _, v := range properties.LoadBalancers {
loadBalancers = append(loadBalancers, &ecs.LoadBalancer{
Expand All @@ -117,10 +121,32 @@ func (p *ECSServiceResource) create(properties *ECSServiceProperties) (string, e
return "", fmt.Errorf("error creating service: %v", err)
}

return *resp.Service.ServiceArn, nil
arn := resp.Service.ServiceArn

stabilized := make(chan struct{})
go func() {
if err := p.ecs.WaitUntilServicesStable(&ecs.DescribeServicesInput{
Cluster: properties.Cluster,
Services: []*string{arn},
}); err != nil {
// We're ignoring this error, because the service was created,
// and if the service doesn't stabilize, it's better to just let
// the stack finish creating than rolling back.
reporter.Report(ctx, err)
}
close(stabilized)
}()

select {
case <-stabilized:
case <-ctx.Done():
return *arn, ctx.Err()
}

return *arn, nil
}

func (p *ECSServiceResource) delete(service, cluster *string) error {
func (p *ECSServiceResource) delete(ctx context.Context, service, cluster *string) error {
// We have to scale the service down to 0, before we're able to
// destroy it.
if _, err := p.ecs.UpdateService(&ecs.UpdateServiceInput{
Expand Down
Loading

0 comments on commit 2286595

Please sign in to comment.