Skip to content

Commit

Permalink
Run unresponsive instance cleanup threads
Browse files Browse the repository at this point in the history
  • Loading branch information
owenniles committed Oct 21, 2022
1 parent a72c970 commit 2fbe5b9
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 29 deletions.
53 changes: 38 additions & 15 deletions backend/services/scaling-service/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/whisthq/whist/backend/services/metadata"
"github.com/whisthq/whist/backend/services/scaling-service/config"
"github.com/whisthq/whist/backend/services/scaling-service/dbclient"
hosts "github.com/whisthq/whist/backend/services/scaling-service/hosts/aws"
algos "github.com/whisthq/whist/backend/services/scaling-service/scaling_algorithms/default" // Import as algos, short for scaling_algorithms
"github.com/whisthq/whist/backend/services/subscriptions"
"github.com/whisthq/whist/backend/services/utils"
Expand Down Expand Up @@ -115,26 +116,48 @@ func main() {
// Use a sync map since we only write the keys once but will be reading multiple
// times by different goroutines.
algorithmByRegionMap := &sync.Map{}
regions := config.GetEnabledRegions()
stopFuncs := make([]func(), 0, len(regions))

// Load default scaling algorithm for all enabled regions.
for _, region := range config.GetEnabledRegions() {
// Load and instantiate default scaling algorithm for all enabled regions.
for _, region := range regions {
name := utils.Sprintf("default-sa-%s", region)
algorithmByRegionMap.Store(name, &algos.DefaultScalingAlgorithm{
Region: region,
})
handler := &hosts.AWSHost{}

if err := handler.Initialize(region); err != nil {
logger.Errorf("Failed to initialize host handler for region '%s'", region)
continue
}

algo := &algos.DefaultScalingAlgorithm{Host: handler, Region: region}

algo.CreateEventChans()
algo.CreateGraphQLClient(graphqlClient)
algo.CreateDBClient(dbClient)
algo.ProcessEvents(globalCtx, goroutineTracker)
algo.GetConfig(configGraphqlClient)

stop := CleanRegion(graphqlClient, handler)

stopFuncs = append(stopFuncs, stop)
algorithmByRegionMap.Store(name, algo)
}

// Instantiate scaling algorithms on allowed regions
algorithmByRegionMap.Range(func(key, value interface{}) bool {
scalingAlgorithm := value.(algos.ScalingAlgorithm)
scalingAlgorithm.CreateEventChans()
scalingAlgorithm.CreateGraphQLClient(graphqlClient)
scalingAlgorithm.CreateDBClient(dbClient)
scalingAlgorithm.ProcessEvents(globalCtx, goroutineTracker)
scalingAlgorithm.GetConfig(configGraphqlClient)
// Wait for each of our cleanup threads to finish before we exit.
defer func() {
var wg sync.WaitGroup

return true
})
for j := range stopFuncs {
wg.Add(1)

go func(i int) {
defer wg.Done()
stopFuncs[i]()
}(j)
}

wg.Wait()
}()

// Start main event loop
go eventLoop(globalCtx, globalCancel, serverEvents, subscriptionEvents, scheduledEvents, algorithmByRegionMap, configClient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/whisthq/whist/backend/services/scaling-service/config"
"github.com/whisthq/whist/backend/services/scaling-service/dbclient"
"github.com/whisthq/whist/backend/services/scaling-service/hosts"
aws "github.com/whisthq/whist/backend/services/scaling-service/hosts/aws"
"github.com/whisthq/whist/backend/services/subscriptions"
"github.com/whisthq/whist/backend/services/utils"
logger "github.com/whisthq/whist/backend/services/whistlogger"
Expand Down Expand Up @@ -195,19 +194,6 @@ func (s *DefaultScalingAlgorithm) GetConfig(client subscriptions.WhistGraphQLCli
// events and executing the appropiate scaling actions. This function is specific for each region
// scaling algorithm to be able to implement different strategies on each region.
func (s *DefaultScalingAlgorithm) ProcessEvents(globalCtx context.Context, goroutineTracker *sync.WaitGroup) {
if s.Host == nil {
// TODO when multi-cloud support is introduced, figure out a way to
// decide which host to use. For now default to AWS.
handler := &aws.AWSHost{}
err := handler.Initialize(s.Region)

if err != nil {
logger.Errorf("error starting host in %s: %s", s.Region, err)
}

s.Host = handler
}

// Start algorithm main event loop
// Track this goroutine so we can wait for it to
// finish if the global context gets cancelled.
Expand Down

0 comments on commit 2fbe5b9

Please sign in to comment.