Skip to content

Commit

Permalink
internal/coordinator/pool: add a hard limit for a1.metal instances
Browse files Browse the repository at this point in the history
This change allows the ledger to track how many a1.metal instances
have either been created or are in the process of being created. A
hard limit of 1 has been set for a1.metal limits while testing is
being performed.

For golang/go#42604

Change-Id: I5bcb3a65407af07d225caf2884877ce040ee011b
Reviewed-on: https://go-review.googlesource.com/c/build/+/322855
Trust: Carlos Amedee <carlos@golang.org>
Run-TryBot: Carlos Amedee <carlos@golang.org>
TryBot-Result: Go Bot <gobot@golang.org>
Reviewed-by: Alexander Rakoczy <alex@golang.org>
  • Loading branch information
cagedmantis committed May 26, 2021
1 parent f2096ad commit 79f0ef2
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 13 deletions.
32 changes: 26 additions & 6 deletions internal/coordinator/pool/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type entry struct {
createdAt time.Time
instanceID string
instanceName string
instanceType string
vCPUCount int64
}

Expand All @@ -40,6 +41,10 @@ type ledger struct {
// entries contains a mapping of instance name to entries for each instance
// that has resources allocated to it.
entries map[string]*entry
// instanceA1Limit is the limit of a1.metal instances which can be created on EC2.
instanceA1Limit int64
// instanceA1Used is the current count of a1.metal instances.
instanceA1Used int64
// types contains a mapping of instance type names to instance types for each
// ARM64 EC2 instance.
types map[string]*cloud.InstanceType
Expand All @@ -48,8 +53,9 @@ type ledger struct {
// newLedger creates a new ledger.
func newLedger() *ledger {
return &ledger{
entries: make(map[string]*entry),
types: make(map[string]*cloud.InstanceType),
entries: make(map[string]*entry),
instanceA1Limit: 1, // TODO(golang.org/issue/42604) query for limit once issue is resolved.
types: make(map[string]*cloud.InstanceType),
}
}

Expand All @@ -65,7 +71,7 @@ func (l *ledger) ReserveResources(ctx context.Context, instName, vmType string)
t := time.NewTicker(2 * time.Second)
defer t.Stop()
for {
if l.allocateCPU(instType.CPU, instName) {
if l.allocateResources(instType.CPU, instName, instType.Type) {
return nil
}
select {
Expand Down Expand Up @@ -94,44 +100,58 @@ func (l *ledger) PrepareReservationRequest(instName, vmType string) (*cloud.Inst
return instType, nil
}

// releaseResources deletes the entry associated with an instance. The resources associated to the
const a1MetalInstance = "a1.metal" // added for golang.org/issue/42604

// releaseResources deletes the entry associated with an instance. The resources associated with the
// instance will also be released. An error is returned if the instance entry is not found.
// Lock l.mu must be held by the caller.
func (l *ledger) releaseResources(instName string) error {
e, ok := l.entries[instName]
if !ok {
return fmt.Errorf("instance not found for releasing quota: %s", instName)
}
if e.instanceType == a1MetalInstance && l.instanceA1Used > 0 {
l.instanceA1Used--
}
l.deallocateCPU(e.vCPUCount)
return nil
}

// allocateCPU ensures that there is enough CPU to allocate below the CPU Quota
// allocateResources ensures that there is enough CPU to allocate below the CPU Quota
// for the caller to create a resouce with the numCPU passed in. If there is enough
// then the ammount of used CPU will increase by the requested ammount. If there is
// not enough CPU available, then a false is returned. In the event that CPU is allocated
// an entry will be added in the entries map for the instance.
func (l *ledger) allocateCPU(numCPU int64, instName string) bool {
// It also enforces instance type limits.
func (l *ledger) allocateResources(numCPU int64, instName, instType string) bool {
// should never happen
if numCPU <= 0 {
log.Printf("invalid allocation requested: %d", numCPU)
return false
}
isA1Metal := instType == a1MetalInstance

l.mu.Lock()
defer l.mu.Unlock()

if isA1Metal && l.instanceA1Used >= l.instanceA1Limit {
return false
}
if numCPU+l.cpuUsed > l.cpuLimit {
return false
}
l.cpuUsed += numCPU
if isA1Metal {
l.instanceA1Used++
}
e, ok := l.entries[instName]
if ok {
e.vCPUCount = numCPU
} else {
l.entries[instName] = &entry{
instanceName: instName,
vCPUCount: numCPU,
instanceType: instType,
}
}
return true
Expand Down
112 changes: 105 additions & 7 deletions internal/coordinator/pool/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ func TestLedgerReleaseResources(t *testing.T) {
instName string
entry *entry
cpuUsed int64
a1Used int64
wantCPUUsed int64
wantA1Used int64
wantErr bool
}{
{
Expand All @@ -126,7 +128,9 @@ func TestLedgerReleaseResources(t *testing.T) {
vCPUCount: 10,
},
cpuUsed: 20,
a1Used: 0,
wantCPUUsed: 10,
wantA1Used: 0,
wantErr: false,
},
{
Expand All @@ -137,14 +141,45 @@ func TestLedgerReleaseResources(t *testing.T) {
vCPUCount: 10,
},
cpuUsed: 20,
a1Used: 0,
wantCPUUsed: 20,
wantA1Used: 0,
wantErr: true,
},
{
desc: "success-with-a1-instance",
instName: "inst-x",
entry: &entry{
instanceName: "inst-x",
vCPUCount: 10,
instanceType: a1MetalInstance,
},
cpuUsed: 20,
a1Used: 1,
wantCPUUsed: 10,
wantA1Used: 0,
wantErr: false,
},
{
desc: "entry-not-found-with-a1-instance",
instName: "inst-x",
entry: &entry{
instanceName: "inst-w",
vCPUCount: 10,
instanceType: a1MetalInstance,
},
cpuUsed: 20,
a1Used: 1,
wantCPUUsed: 20,
wantA1Used: 1,
wantErr: true,
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
l := &ledger{
cpuUsed: tc.cpuUsed,
cpuUsed: tc.cpuUsed,
instanceA1Used: tc.a1Used,
entries: map[string]*entry{
tc.entry.instanceName: tc.entry,
},
Expand All @@ -156,62 +191,125 @@ func TestLedgerReleaseResources(t *testing.T) {
if l.cpuUsed != tc.wantCPUUsed {
t.Errorf("ledger.cpuUsed = %d; wanted %d", l.cpuUsed, tc.wantCPUUsed)
}
if l.instanceA1Used != tc.wantA1Used {
t.Errorf("ledger.instanceA1Used = %d; wanted %d", l.instanceA1Used, tc.wantA1Used)
}
})
}
}

func TestLedgerAllocateCPU(t *testing.T) {
func TestLedgerAllocateResources(t *testing.T) {
testCases := []struct {
desc string
numCPU int64
cpuLimit int64
cpuUsed int64
a1Used int64
a1Limit int64
instName string
instType string
wantReserve bool
wantCPUUsed int64
wantA1Used int64
}{
{
desc: "reservation-success",
numCPU: 10,
cpuLimit: 10,
cpuUsed: 0,
a1Used: 0,
a1Limit: 1,
instName: "chacha",
instType: "x.type",
wantReserve: true,
wantCPUUsed: 10,
wantA1Used: 0,
},
{
desc: "failed-to-reserve",
a1Used: 0,
a1Limit: 1,
numCPU: 10,
cpuLimit: 5,
cpuUsed: 0,
instName: "pasa",
instType: "x.type",
wantReserve: false,
wantCPUUsed: 0,
wantA1Used: 0,
},
{
desc: "invalid-cpu-count",
a1Used: 0,
a1Limit: 1,
numCPU: 0,
cpuLimit: 50,
cpuUsed: 20,
instName: "double",
instType: "x.type",
wantReserve: false,
wantCPUUsed: 20,
wantA1Used: 0,
},
{
desc: "reservation-success with a1.metal instance",
numCPU: 10,
cpuLimit: 10,
cpuUsed: 0,
a1Used: 0,
a1Limit: 1,
instName: "chacha",
instType: a1MetalInstance,
wantReserve: true,
wantCPUUsed: 10,
wantA1Used: 1,
},
{
desc: "failed-to-reserve with a1.metal instance",
a1Used: 0,
a1Limit: 1,
numCPU: 10,
cpuLimit: 5,
cpuUsed: 0,
instName: "pasa",
instType: a1MetalInstance,
wantReserve: false,
wantCPUUsed: 0,
wantA1Used: 0,
},
{
desc: "invalid-cpu-count with a1.metal instance",
a1Used: 0,
a1Limit: 10,
numCPU: 0,
cpuLimit: 50,
cpuUsed: 20,
instName: "double",
instType: a1MetalInstance,
wantReserve: false,
wantCPUUsed: 20,
wantA1Used: 0,
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
l := &ledger{
entries: make(map[string]*entry),
cpuLimit: tc.cpuLimit,
cpuUsed: tc.cpuUsed,
entries: make(map[string]*entry),
cpuLimit: tc.cpuLimit,
cpuUsed: tc.cpuUsed,
instanceA1Limit: tc.a1Limit,
instanceA1Used: tc.a1Used,
}
gotReserve := l.allocateCPU(tc.numCPU, tc.instName)
gotReserve := l.allocateResources(tc.numCPU, tc.instName, tc.instType)
if gotReserve != tc.wantReserve {
t.Errorf("ledger.allocateCPU(%d) = %v, want %v", tc.numCPU, gotReserve, tc.wantReserve)
t.Errorf("ledger.allocateResources(%d) = %v, want %v", tc.numCPU, gotReserve, tc.wantReserve)
}
if l.cpuUsed != tc.wantCPUUsed {
t.Errorf("ledger.cpuUsed = %d; want %d", l.cpuUsed, tc.wantCPUUsed)
}
if l.instanceA1Used != tc.wantA1Used {
t.Errorf("ledger.instanceA1Used = %d; want %d", l.instanceA1Used, tc.wantA1Used)
}
if _, ok := l.entries[tc.instName]; tc.wantReserve && !ok {
t.Fatalf("ledger.entries[%s] = nil; want it to exist", tc.instName)
}
Expand Down

0 comments on commit 79f0ef2

Please sign in to comment.