Skip to content

Commit

Permalink
Added lifetime for the Resources in Label definition (#39)
Browse files Browse the repository at this point in the history
With lifetime timeout we can be sure the application resource will not be executed forever if something bad happened and based on this timeout fish node will be able to do some heuristics on how much time left till provisioning the next resource (#38).

The duration is set in standard golang format as string "1h2m3s", if it's empty or 0 - the default from fish config will be used. And if it's negative (like "-1s") then the resource should exist until user say so by deallocating it.

This change really helps with clouds where we certainly don't need to left the resources for long time.
  • Loading branch information
sparshev authored Nov 16, 2022
1 parent 2937da1 commit be38ee2
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 0 deletions.
10 changes: 10 additions & 0 deletions docs/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1206,15 +1206,18 @@ components:
- multitenancy
- cpu_overbook
- ram_overbook
- lifetime
properties:
cpu:
x-go-type: uint
type: integer
minimum: 0
description: Amount of vCPUs (logical CPU with HT enabled will have 2 per core)
ram:
x-go-type: uint
type: integer
minimum: 0
description: Amount of RAM in GB
disks:
type: object
additionalProperties:
Expand All @@ -1231,6 +1234,13 @@ components:
ram_overbook:
type: boolean
description: Tolerate to RAM overbooking when executed together with other envs
lifetime:
type: string
description: |
Total lifetime of the resource in Time Duration (ex. "1h30m30s"). Begins on Resource
create time till deallocate by user or auto deallocate by timeout. If it's empty or "0"
then default value from fish node config will be used. If it's negative (ex. "-1s")
then the resource will live forever or until the user requests deallocate.
ResourcesDisk:
type: object
Expand Down
9 changes: 9 additions & 0 deletions lib/fish/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
package fish

import (
"fmt"
"io/ioutil"
"os"
"time"

"github.com/adobe/aquarium-fish/lib/util"
"github.com/ghodss/yaml"
Expand All @@ -35,6 +37,8 @@ type Config struct {
NodeName string `json:"node_name"` // Last resort in case you need to override the default host node name
NodeLocation string `json:"node_location"` // Specify cluster node location for multi-dc configurations

DefaultResourceLifetime string `json:"default_resource_lifetime"` // Sets the lifetime of the resource which will be used if label definition one is not set

Drivers []ConfigDriver `json:"drivers"` // If specified - only the listed plugins will be loaded
}

Expand Down Expand Up @@ -65,6 +69,11 @@ func (c *Config) ReadConfigFile(cfg_path string) error {
c.TLSCrt = c.NodeName + ".crt"
}

_, err := time.ParseDuration(c.DefaultResourceLifetime)
if c.DefaultResourceLifetime != "" && err != nil {
return fmt.Errorf("Fish: Default Resource Lifetime parse error: %v", err)
}

return nil
}

Expand Down
31 changes: 31 additions & 0 deletions lib/fish/fish.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,25 @@ func (f *Fish) executeApplication(vote types.Vote) error {
f.ApplicationStateCreate(app_state)
}

// Getting the resource lifetime to know how much time it will live
resource_lifetime, err := time.ParseDuration(label_def.Resources.Lifetime)
if label_def.Resources.Lifetime != "" && err != nil {
log.Println("Fish: Error: Can't parse the Lifetime from Label Definition:", label.UID, res.DefinitionIndex)
// Trying to get default value from fish config
resource_lifetime, err = time.ParseDuration(f.cfg.DefaultResourceLifetime)
if f.cfg.DefaultResourceLifetime != "" && err != nil {
// Not fatal error - in worst case the resource will just sit there but at least will
// not ruin the workload execution
log.Println("Fish: Error: Can't parse the Default Resource Lifetime from fish config")
}
}
resource_timeout := res.CreatedAt.Add(resource_lifetime)
if resource_lifetime > 0 {
log.Printf("Fish: Resource %s will be deallocated by timeout in %s (%s)", app.UID, resource_lifetime, resource_timeout)
} else {
log.Println("Fish: Warning: Resource have no lifetime set and will live until deallocated by user:", app.UID)
}

// Run the loop to wait for deallocate request
var deallocate_retry uint8 = 1
for app_state.Status == types.ApplicationStatusALLOCATED {
Expand All @@ -587,6 +606,18 @@ func (f *Fish) executeApplication(vote types.Vote) error {
log.Println("Fish: Unable to get status for Application:", app.UID, err)
}

// Check if it's life timeout for the resource
if resource_lifetime > 0 {
// The time limit is set - so let's use resource create time and find out timeout
if resource_timeout.Before(time.Now()) {
// Seems the timeout has come, so fish asks for application deallocate
app_state = &types.ApplicationState{ApplicationUID: app.UID, Status: types.ApplicationStatusDEALLOCATE,
Description: fmt.Sprintf("Resource lifetime timeout reached: %s", resource_lifetime),
}
f.ApplicationStateCreate(app_state)
}
}

// Execute the existing ApplicationTasks. It will be executed during ALLOCATED or prior
// to executing deallocation by DEALLOCATE & RECALLED which right now is useful for
// `snapshot` tasks.
Expand Down
5 changes: 5 additions & 0 deletions lib/fish/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package fish
import (
"fmt"
"log"
"time"

"github.com/adobe/aquarium-fish/lib/openapi/types"
"github.com/adobe/aquarium-fish/lib/util"
Expand Down Expand Up @@ -49,6 +50,10 @@ func (f *Fish) LabelCreate(l *types.Label) error {
if def.Resources.Ram < 1 {
return fmt.Errorf("Fish: Resources RAM can't be less than 1 in Label Definition %d", i)
}
_, err := time.ParseDuration(def.Resources.Lifetime)
if def.Resources.Lifetime != "" && err != nil {
return fmt.Errorf("Fish: Resources Lifetime parse error in Label Definition %d: %v", i, err)
}
if def.Options == "" {
l.Definitions[i].Options = "{}"
}
Expand Down
145 changes: 145 additions & 0 deletions tests/label_lifetime_timeout_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/**
* Copyright 2021 Adobe. All rights reserved.
* This file is licensed to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may obtain a copy
* of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
* OF ANY KIND, either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/

package tests

import (
"crypto/tls"
"fmt"
"net/http"
"testing"
"time"

"github.com/google/uuid"
"github.com/steinfletcher/apitest"

"github.com/adobe/aquarium-fish/lib/openapi/types"
)

// Checks the Application is getting deallocated by timeout
func Test_label_lifetime_timeout(t *testing.T) {
t.Parallel()
afi := RunAquariumFish(t, `---
node_name: node-1
node_location: test_loc
api_address: 127.0.0.1:0
drivers:
- name: test`)

t.Cleanup(func() {
afi.Cleanup()
})

defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered in f", r)
}
}()

tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
cli := &http.Client{
Timeout: time.Second * 5,
Transport: tr,
}

var label types.Label
t.Run("Create Label", func(t *testing.T) {
apitest.New().
EnableNetworking(cli).
Post(afi.ApiAddress("api/v1/label/")).
JSON(`{"name":"test-label", "version":1, "definitions": [
{"driver":"test","resources":{"cpu":1,"ram":2,"lifetime":"15s"}}
]}`).
BasicAuth("admin", afi.AdminToken()).
Expect(t).
Status(http.StatusOK).
End().
JSON(&label)

if label.UID == uuid.Nil {
t.Fatalf("Label UID is incorrect: %v", label.UID)
}
})

var app types.Application
t.Run("Create Application", func(t *testing.T) {
apitest.New().
EnableNetworking(cli).
Post(afi.ApiAddress("api/v1/application/")).
JSON(`{"label_UID":"`+label.UID.String()+`"}`).
BasicAuth("admin", afi.AdminToken()).
Expect(t).
Status(http.StatusOK).
End().
JSON(&app)

if app.UID == uuid.Nil {
t.Fatalf("Application UID is incorrect: %v", app.UID)
}
})

var app_state types.ApplicationState
t.Run("Application should get ALLOCATED in 10 sec", func(t *testing.T) {
Retry(&Timer{Timeout: 10 * time.Second, Wait: 1 * time.Second}, t, func(r *R) {
apitest.New().
EnableNetworking(cli).
Get(afi.ApiAddress("api/v1/application/"+app.UID.String()+"/state")).
BasicAuth("admin", afi.AdminToken()).
Expect(r).
Status(http.StatusOK).
End().
JSON(&app_state)

if app_state.Status != types.ApplicationStatusALLOCATED {
r.Fatalf("Application Status is incorrect: %v", app_state.Status)
}
})
})

time.Sleep(10 * time.Second)

t.Run("Application should be still ALLOCATED in 10 sec", func(t *testing.T) {
apitest.New().
EnableNetworking(cli).
Get(afi.ApiAddress("api/v1/application/"+app.UID.String()+"/state")).
BasicAuth("admin", afi.AdminToken()).
Expect(t).
Status(http.StatusOK).
End().
JSON(&app_state)

if app_state.Status != types.ApplicationStatusALLOCATED {
t.Fatalf("Application Status is incorrect: %v", app_state.Status)
}
})

t.Run("Application should get DEALLOCATED in 5 sec", func(t *testing.T) {
Retry(&Timer{Timeout: 5 * time.Second, Wait: 1 * time.Second}, t, func(r *R) {
apitest.New().
EnableNetworking(cli).
Get(afi.ApiAddress("api/v1/application/"+app.UID.String()+"/state")).
BasicAuth("admin", afi.AdminToken()).
Expect(r).
Status(http.StatusOK).
End().
JSON(&app_state)

if app_state.Status != types.ApplicationStatusDEALLOCATED {
r.Fatalf("Application Status is incorrect: %v", app_state.Status)
}
})
})
}

0 comments on commit be38ee2

Please sign in to comment.