Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Terminate inaccessible compute instances. #7427

Open
wants to merge 7 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions backend/services/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,11 @@ run_host_service_dev: build

run_scaling_service: build
$(info running $(BUILD_FOLDER)/scaling-service with localdev configuration...)
APP_ENV=localdev $(BUILD_FOLDER)/scaling-service
APP_ENV=localdev $(BUILD_FOLDER)/scaling-service -nocleanup

run_scaling_service_localdevwithdb: build
$(info running $(BUILD_FOLDER)/scaling-service with localdevwithdb configuration...)
APP_ENV=localdevwithdb $(BUILD_FOLDER)/scaling-service
APP_ENV=localdevwithdb $(BUILD_FOLDER)/scaling-service -nocleanup

run_scaling_service_dev: build
$(info running $(BUILD_FOLDER)/scaling-service with dev (deployed) configuration...)
Expand Down
133 changes: 133 additions & 0 deletions backend/services/scaling-service/cleanup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright (c) 2022 Whist Technologies, Inc.

package main

import (
"context"
"sync"
"time"

"github.com/whisthq/whist/backend/services/scaling-service/dbclient"
"github.com/whisthq/whist/backend/services/scaling-service/hosts"
"github.com/whisthq/whist/backend/services/subscriptions"
logger "github.com/whisthq/whist/backend/services/whistlogger"
)

var db dbclient.DBClient

// CleanRegion starts the unresponsive instance cleanup thread for a particular
// region and returns a function that can be used to stop it. You should call
// CleanRegion once per region, but it doesn't really make sense to call it
// more than once per region.
//
// The stop function blocks until all in-progress cleaning operations have
// completed. Consider calling this method in its own goroutine like so:
//
// var cleaner *Cleaner
// var wg sync.WaitGroup
//
// wg.Add(1)
//
// go func() {
// defer wg.Done()
// cleaner.Stop()
// }()
//
// wg.Wait()
func CleanRegion(client subscriptions.WhistGraphQLClient, h hosts.HostHandler, d time.Duration) func() {
stop := make(chan struct{})
ticker := time.NewTicker(d)
var wg sync.WaitGroup

// Don't bother adding this goroutine to the cleaner's wait group. It will
// finish as soon as the stop channel is closed.
go func() {
for {
select {
case <-ticker.C:
wg.Add(1)

go func() {
defer wg.Done()

// TODO: Make the deadline more configurable.
deadline := time.Now().Add(5 * time.Minute)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()

do(ctx, client, h)
}()
case <-stop:
return
}
}
}()

return func() {
ticker.Stop()
close(stop)
wg.Wait()
}
}

// do marks all unresponsive instances as TERMINATING in the database before
// subsequently terminating them and finally removing them from the database
// altogether.
func do(ctx context.Context, client subscriptions.WhistGraphQLClient, h hosts.HostHandler) {
region := h.GetRegion()
maxAge := time.Now().Add(-150 * time.Second)
ids, err := db.LockBrokenInstances(ctx, client, region, maxAge)

if err != nil {
logger.Errorf("failed to mark unresponsive instances as TERMINATING in "+
"region %s: %s", region, err)
return
} else if len(ids) < 1 {
logger.Debugf("Didn't find any unresponsive instances in region %s",
region)
return
}

if err := h.SpinDownInstances(ctx, ids); err != nil {
logger.Errorf("failed to terminate unresponsive instances in region %s:"+
"%s", region, err)
logger.Errorf("please verify that the instances in %s with the following "+
"instance IDs have been terminated and then remove them from the "+
"database: %s", region, ids)
return
}

deleted, err := db.TerminateLockedInstances(ctx, client, region, ids)

if err != nil {
logger.Errorf("failed to remove unresponsive instances in %s from the "+
"database: %s", region, err)
return
}

if !equal(deleted, ids) {
logger.Errorf("some %s instance rows were not deleted: requested "+
"%v, got %v", region, ids, deleted)
} else {
logger.Info("Successfully removed the following unresponsive instance "+
"rows from the database for %s:", region, deleted)
}
}

func equal(u, v []string) bool {
if len(u) != len(v) {
return false
}

same := true
set := make(map[string]struct{}, len(u))

for _, s := range u {
set[s] = struct{}{}
}

for _, s := range v {
_, ok := set[s]
same = same && ok
}
}
66 changes: 66 additions & 0 deletions backend/services/scaling-service/dbclient/cleanup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package dbclient

import (
"context"
"time"

"github.com/hasura/go-graphql-client"
"github.com/whisthq/whist/backend/services/subscriptions"
)

// LockBrokenInstances sets the column of each row of the database corresponding
// to an instance that hasn't been updated since maxAge to TERMINATING. It
// returns the instance ID of every such instance.
func (c *DBClient) LockBrokenInstances(ctx context.Context, client subscriptions.WhistGraphQLClient, region string, maxAge time.Time) ([]string, error) {
var m subscriptions.LockBrokenInstances
vars := map[string]interface{}{
"maxAge": timestamptz(maxAge),
"region": graphql.String(region),
}

if err := client.Mutate(ctx, &m, vars); err != nil {
return nil, err
}

ids := make([]string, 0, m.Response.Count)

for _, host := range m.Response.Hosts {
ids = append(ids, string(host.ID))
}

return ids, nil
}

// TerminatedLockedInstances removes the requested rows, all of whose status
// columns should have TERMINATING, corresponding to unresponsive instances from
// the whist.instances table of the database. It also deletes all rows from the
// whist.mandelboxes table that are foreign keyed to a whist.instances row whose
// deletion has been requested.
func (c *DBClient) TerminateLockedInstances(ctx context.Context, client subscriptions.WhistGraphQLClient, region string, ids []string) ([]string, error) {
var m subscriptions.TerminateLockedInstances

// We need to pass the instance IDs as a slice of graphql String type
//instances, not just a normal string slice.
_ids := make([]graphql.String, 0, len(ids))

for _, id := range ids {
_ids = append(_ids, graphql.String(id))
}

vars := map[string]interface{}{
"ids": _ids,
"region": graphql.String(region),
}

if err := client.Mutate(ctx, &m, vars); err != nil {
return nil, err
}

terminated := make([]string, 0, m.InstancesResponse.Count)

for _, host := range m.InstancesResponse.Hosts {
terminated = append(terminated, string(host.ID))
}

return terminated, nil
}
155 changes: 155 additions & 0 deletions backend/services/scaling-service/dbclient/cleanup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright (c) 2022 Whist Technologies, Inc.

package dbclient

import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
"testing"
"time"

"github.com/hasura/go-graphql-client"
"github.com/whisthq/whist/backend/services/subscriptions"
)

var pkg DBClient

// mockResponse generates a mock GraphQL mutation response.
func mockResponse(field string, ids []string) ([]byte, error) {
// The response contains a list of instance IDs.
type host struct {
ID string `json:"id"`
}

count := len(ids)
hosts := make([]host, 0, count)

for _, id := range ids {
hosts = append(hosts, host{id})
}

data := struct {
Count int `json:"affected_rows"`
Hosts []host `json:"returning"`
}{count, hosts}

// Dynamically construct an auxiliary type to serialize the mock response
// data.
t := reflect.StructOf([]reflect.StructField{
{
Name: "Response",
Type: reflect.TypeOf(data),
Tag: reflect.StructTag(fmt.Sprintf(`json:"%s"`, field)),
},
})
v := reflect.New(t)

v.Elem().FieldByName("Response").Set(reflect.ValueOf(data))

return json.Marshal(v.Interface())
}

// testClient provides mock responses to GraphQL mutations.
type testClient struct {
// ids contains mock instance IDs of unresponsive instances.
ids []string
}

// Initialize is part of the subscriptions.WhistGraphQLClient interface.
func (*testClient) Initialize(bool) error {
return nil
}

// Query is part of the subscriptions.WhistGraphQLClient interface.
func (*testClient) Query(context.Context, subscriptions.GraphQLQuery, map[string]interface{}) error {
return nil
}

// Mutate is part of the subscriptions.WhistGraphQLClient interface. This
// implementation populates the mutation struct with mock host data.
func (c *testClient) Mutate(_ context.Context, q subscriptions.GraphQLQuery, v map[string]interface{}) error {
// Depending what kind of mock response we are providing, we select a list of
// instance IDs to return.
var ids []string
var field string

switch q.(type) {
case *subscriptions.LockBrokenInstances:
field = "update_whist_instances"

// We are providing a mock response to markForTermination, so we use the
// list of instance IDs stored in the "database" (i.e. struct)
ids = c.ids
case *subscriptions.TerminateLockedInstances:
field = "delete_whist_instances"

// We are providing a mock response to finalizeTermination, so we use the
// value of the instance IDs input variable.
tmp1, ok := v["ids"]

if !ok {
return errors.New("missing instance IDs variable")
}

tmp2, ok := tmp1.([]graphql.String)

if !ok {
return errors.New("instance IDs input variable should be a slice of " +
"graphql String types.")
}

ids = make([]string, 0, len(tmp2))

for _, id := range tmp2 {
ids = append(ids, string(id))
}
default:
t := reflect.TypeOf(q)
return fmt.Errorf("unrecognized mutation '%s'", t)
}

data, err := mockResponse(field, ids)

if err != nil {
return err
}

// Deserialize the mock data into the response struct.
if err := graphql.UnmarshalGraphQL(data, q); err != nil {
return err
}

return nil
}

// TestLockBrokenInstances tests that LockBrokenInstances converts the GraphQL
// mutation response from Hasura to a slice of instance IDs.
func TestLockBrokenInstances(t *testing.T) {
var tt time.Time
ids := []string{"instance-0", "instance-1", "instance-2"}
client := &testClient{ids: ids}
res, err := pkg.LockBrokenInstances(context.TODO(), client, "us-east-1", tt)

if err != nil {
t.Error("markForTermination:", err)
} else if !reflect.DeepEqual(res, ids) {
t.Error(fmt.Printf("Expected %v, got %v", ids, res))
}
}

// TestTerminateLockedInstances tests that TerminateLockedInstances converts
// the GraphQL mutation response from Hasura to a slice of instance IDs.
func TestTerminateLockedInstances(t *testing.T) {
client := &testClient{}
ids := []string{"instance-0", "instance-1", "instance-2"}
res, err := pkg.TerminateLockedInstances(context.TODO(), client, "us-east-1", ids)

if err != nil {
t.Error("finalizeTermination:", err)
} else if !reflect.DeepEqual(res, ids) {
t.Error(fmt.Printf("Expected %v, got %v", ids, res))
}
}
4 changes: 4 additions & 0 deletions backend/services/scaling-service/dbclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package dbclient

import (
"context"
"time"

"github.com/whisthq/whist/backend/services/subscriptions"
)
Expand All @@ -37,6 +38,9 @@ type WhistDBClient interface {
QueryUserMandelboxes(context.Context, subscriptions.WhistGraphQLClient, string) ([]subscriptions.Mandelbox, error)
InsertMandelboxes(context.Context, subscriptions.WhistGraphQLClient, []subscriptions.Mandelbox) (int, error)
UpdateMandelbox(context.Context, subscriptions.WhistGraphQLClient, subscriptions.Mandelbox) (int, error)

LockBrokenInstances(context.Context, subscriptions.WhistGraphQLClient, string, time.Time) ([]string, error)
TerminateLockedInstances(context.Context, subscriptions.WhistGraphQLClient, string, []string) ([]string, error)
}

// DBClient implements `WhistDBClient`, it is the default database
Expand Down
Loading