From db491371161c74bbf10debbca8f2678e8291a55d Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Wed, 8 Jan 2020 17:22:05 -0500 Subject: [PATCH 1/9] CLI: protect against AllocatedResources being nil --- command/alloc_status.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/command/alloc_status.go b/command/alloc_status.go index 1184534c6a3..3ea543d9052 100644 --- a/command/alloc_status.go +++ b/command/alloc_status.go @@ -191,7 +191,7 @@ func (c *AllocStatusCommand) Run(args []string) int { } c.Ui.Output(output) - if len(alloc.AllocatedResources.Shared.Networks) > 0 && alloc.AllocatedResources.Shared.Networks[0].HasPorts() { + if alloc.AllocatedResources != nil && len(alloc.AllocatedResources.Shared.Networks) > 0 && alloc.AllocatedResources.Shared.Networks[0].HasPorts() { c.Ui.Output("") c.Ui.Output(formatAllocNetworkInfo(alloc)) } From 7783c137a34530a176c8952f0cac858ee891b8ab Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Wed, 8 Jan 2020 17:23:56 -0500 Subject: [PATCH 2/9] Migrate old alloc structs on read This commit ensures that Alloc.AllocatedResources is properly populated when read from persistence stores (namely Raft and client state store). The alloc struct may have been written previously by an arbitrary old version that may only populate Alloc.TaskResources. --- client/state/state_database.go | 3 ++ nomad/fsm.go | 28 ++++++++------- nomad/fsm_test.go | 66 ++++++++++++++++++++++++++++++++++ nomad/structs/structs.go | 39 ++++++++++++++++++-- nomad/structs/structs_test.go | 61 +++++++++++++++++++++++++++++++ 5 files changed, 183 insertions(+), 14 deletions(-) diff --git a/client/state/state_database.go b/client/state/state_database.go index d6784e98ee5..c513e13da39 100644 --- a/client/state/state_database.go +++ b/client/state/state_database.go @@ -207,6 +207,9 @@ func (s *BoltStateDB) getAllAllocations(tx *boltdd.Tx) ([]*structs.Allocation, m continue } + // Handle upgrade path + ae.Alloc.Canonicalize() + allocs = append(allocs, ae.Alloc) } diff --git a/nomad/fsm.go b/nomad/fsm.go index c41e5b9d1b4..fbe9b2342be 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -681,22 +681,23 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} { // prior to being inserted into MemDB. structs.DenormalizeAllocationJobs(req.Job, req.Alloc) - // COMPAT(0.11): Remove in 0.11 - // Calculate the total resources of allocations. It is pulled out in the - // payload to avoid encoding something that can be computed, but should be - // denormalized prior to being inserted into MemDB. for _, alloc := range req.Alloc { - if alloc.Resources != nil { - continue - } + // COMPAT(0.11): Remove in 0.11 + // Calculate the total resources of allocations. It is pulled out in the + // payload to avoid encoding something that can be computed, but should be + // denormalized prior to being inserted into MemDB. + if alloc.Resources == nil { + alloc.Resources = new(structs.Resources) + for _, task := range alloc.TaskResources { + alloc.Resources.Add(task) + } - alloc.Resources = new(structs.Resources) - for _, task := range alloc.TaskResources { - alloc.Resources.Add(task) + // Add the shared resources + alloc.Resources.Add(alloc.SharedResources) } - // Add the shared resources - alloc.Resources.Add(alloc.SharedResources) + // Handle upgrade path + alloc.Canonicalize() } if err := n.state.UpsertAllocs(index, req.Alloc); err != nil { @@ -1166,6 +1167,9 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { return err } + // Handle upgrade path + alloc.Canonicalize() + if err := restore.AllocRestore(alloc); err != nil { return err } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index d75ba1ee396..a9d712c6e38 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -1379,6 +1379,45 @@ func TestFSM_UpsertAllocs_StrippedResources(t *testing.T) { } } +// TestFSM_UpsertAllocs_Canonicalize asserts that allocations are Canonicalized +// to handle logs emited by servers running old versions +func TestFSM_UpsertAllocs_Canonicalize(t *testing.T) { + t.Parallel() + fsm := testFSM(t) + + alloc := mock.Alloc() + alloc.Resources = &structs.Resources{} // COMPAT(0.11): Remove in 0.11, used to bypass resource creation in state store + alloc.AllocatedResources = nil + + // pre-assert that our mock populates old field + require.NotEmpty(t, alloc.TaskResources) + + fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID)) + req := structs.AllocUpdateRequest{ + Alloc: []*structs.Allocation{alloc}, + } + buf, err := structs.Encode(structs.AllocUpdateRequestType, req) + require.NoError(t, err) + + resp := fsm.Apply(makeLog(buf)) + require.Nil(t, resp) + + // Verify we are registered + ws := memdb.NewWatchSet() + out, err := fsm.State().AllocByID(ws, alloc.ID) + require.NoError(t, err) + + require.NotNil(t, out.AllocatedResources) + require.Contains(t, out.AllocatedResources.Tasks, "web") + + expected := alloc.Copy() + expected.Canonicalize() + expected.CreateIndex = out.CreateIndex + expected.ModifyIndex = out.ModifyIndex + expected.AllocModifyIndex = out.AllocModifyIndex + require.Equal(t, expected, out) +} + func TestFSM_UpdateAllocFromClient_Unblock(t *testing.T) { t.Parallel() fsm := testFSM(t) @@ -2453,6 +2492,33 @@ func TestFSM_SnapshotRestore_Allocs(t *testing.T) { } } +func TestFSM_SnapshotRestore_Allocs_Canonicalize(t *testing.T) { + t.Parallel() + // Add some state + fsm := testFSM(t) + state := fsm.State() + alloc := mock.Alloc() + + // remove old versions to force migration path + alloc.AllocatedResources = nil + + state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID)) + state.UpsertAllocs(1000, []*structs.Allocation{alloc}) + + // Verify the contents + fsm2 := testSnapshotRestore(t, fsm) + state2 := fsm2.State() + ws := memdb.NewWatchSet() + out, err := state2.AllocByID(ws, alloc.ID) + require.NoError(t, err) + + require.NotNil(t, out.AllocatedResources) + require.Contains(t, out.AllocatedResources.Tasks, "web") + + alloc.Canonicalize() + require.Equal(t, alloc, out) +} + func TestFSM_SnapshotRestore_Indexes(t *testing.T) { t.Parallel() // Add some state diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 4c6b7c4efbd..1356e75eb2f 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -7527,15 +7527,17 @@ type Allocation struct { // the scheduler. Resources *Resources - // COMPAT(0.11): Remove in 0.11 // SharedResources are the resources that are shared by all the tasks in an // allocation + // Deprecated: use AllocatedResources.Shared instead. + // Keep field to allow us to handle upgrade paths from old versions SharedResources *Resources - // COMPAT(0.11): Remove in 0.11 // TaskResources is the set of resources allocated to each // task. These should sum to the total Resources. Dynamic ports will be // set by the scheduler. + // Deprecated: use AllocatedResources.Tasks instead. + // Keep field to allow us to handle upgrade paths from old versions TaskResources map[string]*Resources // AllocatedResources is the total resources allocated for the task group. @@ -7632,6 +7634,39 @@ func (a *Allocation) CopySkipJob() *Allocation { return a.copyImpl(false) } +func (a *Allocation) Canonicalize() { + if a.AllocatedResources == nil && a.TaskResources != nil { + ar := AllocatedResources{} + ar.Tasks = toAllocatedResources(a.TaskResources) + + if a.SharedResources != nil { + ar.Shared.DiskMB = int64(a.SharedResources.DiskMB) + ar.Shared.Networks = a.SharedResources.Networks.Copy() + } + + a.AllocatedResources = &ar + } + + // TODO: Investigate if we should canonicalize the job + // it may be out of sync with respect to the original job + // a.Job.Canonicalize() +} + +func toAllocatedResources(taskResources map[string]*Resources) map[string]*AllocatedTaskResources { + tasks := make(map[string]*AllocatedTaskResources, len(taskResources)) + + for name, tr := range taskResources { + atr := AllocatedTaskResources{} + atr.Cpu.CpuShares = int64(tr.CPU) + atr.Memory.MemoryMB = int64(tr.MemoryMB) + atr.Networks = tr.Networks.Copy() + + tasks[name] = &atr + } + + return tasks +} + func (a *Allocation) copyImpl(job bool) *Allocation { if a == nil { return nil diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index dcb2dfcc82b..e67dc4b5230 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -4089,6 +4089,67 @@ func TestAllocation_NextDelay(t *testing.T) { } +func TestAllocation_Canonicalize_Old(t *testing.T) { + alloc := MockAlloc() + alloc.AllocatedResources = nil + alloc.TaskResources = map[string]*Resources{ + "web": { + CPU: 500, + MemoryMB: 256, + Networks: []*NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []Port{{Label: "admin", Value: 5000}}, + MBits: 50, + DynamicPorts: []Port{{Label: "http", Value: 9876}}, + }, + }, + }, + } + alloc.SharedResources = &Resources{ + DiskMB: 150, + } + alloc.Canonicalize() + + expected := &AllocatedResources{ + Tasks: map[string]*AllocatedTaskResources{ + "web": { + Cpu: AllocatedCpuResources{ + CpuShares: 500, + }, + Memory: AllocatedMemoryResources{ + MemoryMB: 256, + }, + Networks: []*NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []Port{{Label: "admin", Value: 5000}}, + MBits: 50, + DynamicPorts: []Port{{Label: "http", Value: 9876}}, + }, + }, + }, + }, + Shared: AllocatedSharedResources{ + DiskMB: 150, + }, + } + + require.Equal(t, expected, alloc.AllocatedResources) +} + +// TestAllocation_Canonicalize_New asserts that an alloc with latest +// schema isn't modified with Canonicalize +func TestAllocation_Canonicalize_New(t *testing.T) { + alloc := MockAlloc() + copy := alloc.Copy() + + alloc.Canonicalize() + require.Equal(t, copy, alloc) +} + func TestRescheduleTracker_Copy(t *testing.T) { type testCase struct { original *RescheduleTracker From 0a5fd78e0b91bcbfce843bf3692be077a369c0c2 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Thu, 9 Jan 2020 08:34:19 -0500 Subject: [PATCH 3/9] client: canonicalize alloc runner on RPC --- client/client.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/client/client.go b/client/client.go index fe7447dcacf..f23cc1efdc9 100644 --- a/client/client.go +++ b/client/client.go @@ -1995,6 +1995,10 @@ OUTER: // Ensure that we received all the allocations we wanted pulledAllocs = make(map[string]*structs.Allocation, len(allocsResp.Allocs)) for _, alloc := range allocsResp.Allocs { + + // handle an old Server + alloc.Canonicalize() + pulledAllocs[alloc.ID] = alloc } From 058076afd045b5aed4f4779927656c3f466da604 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Thu, 9 Jan 2020 09:25:07 -0500 Subject: [PATCH 4/9] client: stop using alloc.TaskResources Now that alloc.Canonicalize() is called in all alloc sources in the client (i.e. on state restore and RPC fetching), we no longer need to check alloc.TaskResources. alloc.AllocatedResources is always non-nil through alloc runner. Though, early on, we check for alloc validity, so NewTaskRunner and TaskEnv must still check. `TestClient_AddAllocError` test validates that behavior. --- client/allocrunner/taskrunner/service_hook.go | 24 ++---------- client/allocrunner/taskrunner/task_runner.go | 39 +++---------------- client/taskenv/env.go | 25 ------------ client/taskenv/env_test.go | 4 ++ 4 files changed, 14 insertions(+), 78 deletions(-) diff --git a/client/allocrunner/taskrunner/service_hook.go b/client/allocrunner/taskrunner/service_hook.go index 956033f3f31..02b8d75b71a 100644 --- a/client/allocrunner/taskrunner/service_hook.go +++ b/client/allocrunner/taskrunner/service_hook.go @@ -63,16 +63,8 @@ func newServiceHook(c serviceHookConfig) *serviceHook { delay: c.task.ShutdownDelay, } - // COMPAT(0.11): AllocatedResources was added in 0.9 so assume its set - // in 0.11. - if c.alloc.AllocatedResources != nil { - if res := c.alloc.AllocatedResources.Tasks[c.task.Name]; res != nil { - h.networks = res.Networks - } - } else { - if res := c.alloc.TaskResources[c.task.Name]; res != nil { - h.networks = res.Networks - } + if res := c.alloc.AllocatedResources.Tasks[c.task.Name]; res != nil { + h.networks = res.Networks } if c.alloc.DeploymentStatus != nil && c.alloc.DeploymentStatus.Canary { @@ -116,17 +108,9 @@ func (h *serviceHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequ canary = req.Alloc.DeploymentStatus.Canary } - // COMPAT(0.11): AllocatedResources was added in 0.9 so assume its set - // in 0.11. var networks structs.Networks - if req.Alloc.AllocatedResources != nil { - if res := req.Alloc.AllocatedResources.Tasks[h.taskName]; res != nil { - networks = res.Networks - } - } else { - if res := req.Alloc.TaskResources[h.taskName]; res != nil { - networks = res.Networks - } + if res := req.Alloc.AllocatedResources.Tasks[h.taskName]; res != nil { + networks = res.Networks } task := req.Alloc.LookupTask(h.taskName) diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 9f6e5ac34cb..506a3aca90d 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -301,23 +301,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { } tr.taskResources = tres } else { - // COMPAT(0.11): Upgrade from 0.8 resources to 0.9+ resources - // Grab the old task resources - oldTr, ok := tr.alloc.TaskResources[tr.taskName] - if !ok { - return nil, fmt.Errorf("no task resources found on allocation") - } - - // Convert the old to new - tr.taskResources = &structs.AllocatedTaskResources{ - Cpu: structs.AllocatedCpuResources{ - CpuShares: int64(oldTr.CPU), - }, - Memory: structs.AllocatedMemoryResources{ - MemoryMB: int64(oldTr.MemoryMB), - }, - Networks: oldTr.Networks, - } + return nil, fmt.Errorf("no task resources found on allocation") } // Build the restart tracker. @@ -1253,15 +1237,9 @@ func (tr *TaskRunner) UpdateStats(ru *cstructs.TaskResourceUsage) { func (tr *TaskRunner) setGaugeForMemory(ru *cstructs.TaskResourceUsage) { alloc := tr.Alloc() var allocatedMem float32 - if alloc.AllocatedResources != nil { - if taskRes := alloc.AllocatedResources.Tasks[tr.taskName]; taskRes != nil { - // Convert to bytes to match other memory metrics - allocatedMem = float32(taskRes.Memory.MemoryMB) * 1024 * 1024 - } - } else if taskRes := alloc.TaskResources[tr.taskName]; taskRes != nil { - // COMPAT(0.11) Remove in 0.11 when TaskResources is removed - allocatedMem = float32(taskRes.MemoryMB) * 1024 * 1024 - + if taskRes := alloc.AllocatedResources.Tasks[tr.taskName]; taskRes != nil { + // Convert to bytes to match other memory metrics + allocatedMem = float32(taskRes.Memory.MemoryMB) * 1024 * 1024 } if !tr.clientConfig.DisableTaggedMetrics { @@ -1303,13 +1281,8 @@ func (tr *TaskRunner) setGaugeForMemory(ru *cstructs.TaskResourceUsage) { func (tr *TaskRunner) setGaugeForCPU(ru *cstructs.TaskResourceUsage) { alloc := tr.Alloc() var allocatedCPU float32 - if alloc.AllocatedResources != nil { - if taskRes := alloc.AllocatedResources.Tasks[tr.taskName]; taskRes != nil { - allocatedCPU = float32(taskRes.Cpu.CpuShares) - } - } else if taskRes := alloc.TaskResources[tr.taskName]; taskRes != nil { - // COMPAT(0.11) Remove in 0.11 when TaskResources is removed - allocatedCPU = float32(taskRes.CPU) + if taskRes := alloc.AllocatedResources.Tasks[tr.taskName]; taskRes != nil { + allocatedCPU = float32(taskRes.Cpu.CpuShares) } if !tr.clientConfig.DisableTaggedMetrics { diff --git a/client/taskenv/env.go b/client/taskenv/env.go index 57402b442cf..0be24d63f1d 100644 --- a/client/taskenv/env.go +++ b/client/taskenv/env.go @@ -603,7 +603,6 @@ func (b *Builder) setAlloc(alloc *structs.Allocation) *Builder { tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) - // COMPAT(0.11): Remove in 0.11 b.otherPorts = make(map[string]string, len(tg.Tasks)*2) if alloc.AllocatedResources != nil { // Populate task resources @@ -645,30 +644,6 @@ func (b *Builder) setAlloc(alloc *structs.Allocation) *Builder { addGroupPort(b.otherPorts, p) } } - } else if alloc.TaskResources != nil { - if tr, ok := alloc.TaskResources[b.taskName]; ok { - // Copy networks to prevent sharing - b.networks = make([]*structs.NetworkResource, len(tr.Networks)) - for i, n := range tr.Networks { - b.networks[i] = n.Copy() - } - - } - - for taskName, resources := range alloc.TaskResources { - // Add ports from other tasks - if taskName == b.taskName { - continue - } - for _, nw := range resources.Networks { - for _, p := range nw.ReservedPorts { - addPort(b.otherPorts, taskName, nw.IP, p.Label, p.Value) - } - for _, p := range nw.DynamicPorts { - addPort(b.otherPorts, taskName, nw.IP, p.Label, p.Value) - } - } - } } upstreams := []structs.ConsulUpstream{} diff --git a/client/taskenv/env_test.go b/client/taskenv/env_test.go index f68bb05d795..03529143628 100644 --- a/client/taskenv/env_test.go +++ b/client/taskenv/env_test.go @@ -266,6 +266,10 @@ func TestEnvironment_AsList_Old(t *testing.T) { }, }, } + + // simulate canonicalization on restore or fetch + a.Canonicalize() + task := a.Job.TaskGroups[0].Tasks[0] task.Env = map[string]string{ "taskEnvKey": "taskEnvVal", From 4f36d4b308a278bdf920ce17ba0aa9343cf6cf0c Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Fri, 10 Jan 2020 10:41:12 -0500 Subject: [PATCH 5/9] canonicalize allocs from plan results too --- nomad/state/state_store.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 9d915536df9..1590a3fee3e 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -294,6 +294,11 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR allocsToUpsert = append(allocsToUpsert, results.AllocsUpdated...) allocsToUpsert = append(allocsToUpsert, allocsPreempted...) + // handle upgrade path + for _, alloc := range allocsToUpsert { + alloc.Canonicalize() + } + if err := s.upsertAllocsImpl(index, allocsToUpsert, txn); err != nil { return err } From 3291523d8cd5b35e7d97d0ffda4c4e8dec8261df Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Wed, 15 Jan 2020 08:57:05 -0500 Subject: [PATCH 6/9] address review comments --- client/allocrunner/taskrunner/task_runner.go | 14 ++++----- client/state/state_database.go | 1 + client/taskenv/env.go | 3 ++ nomad/structs/structs.go | 31 ++++++++++---------- 4 files changed, 26 insertions(+), 23 deletions(-) diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 506a3aca90d..bb6c6a44596 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -294,15 +294,15 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { // Pull out the task's resources ares := tr.alloc.AllocatedResources - if ares != nil { - tres, ok := ares.Tasks[tr.taskName] - if !ok { - return nil, fmt.Errorf("no task resources found on allocation") - } - tr.taskResources = tres - } else { + if ares == nil { + return nil, fmt.Errorf("no task resources found on allocation") + } + + tres, ok := ares.Tasks[tr.taskName] + if !ok { return nil, fmt.Errorf("no task resources found on allocation") } + tr.taskResources = tres // Build the restart tracker. tg := tr.alloc.Job.LookupTaskGroup(tr.alloc.TaskGroup) diff --git a/client/state/state_database.go b/client/state/state_database.go index c513e13da39..6d1e65fb297 100644 --- a/client/state/state_database.go +++ b/client/state/state_database.go @@ -209,6 +209,7 @@ func (s *BoltStateDB) getAllAllocations(tx *boltdd.Tx) ([]*structs.Allocation, m // Handle upgrade path ae.Alloc.Canonicalize() + ae.Alloc.Job.Canonicalize() allocs = append(allocs, ae.Alloc) } diff --git a/client/taskenv/env.go b/client/taskenv/env.go index 0be24d63f1d..8612651c3e4 100644 --- a/client/taskenv/env.go +++ b/client/taskenv/env.go @@ -604,6 +604,9 @@ func (b *Builder) setAlloc(alloc *structs.Allocation) *Builder { tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) b.otherPorts = make(map[string]string, len(tg.Tasks)*2) + + // Protect against invalid allocs where AllocatedResources isn't set. + // TestClient_AddAllocError explicitly tests for this condition if alloc.AllocatedResources != nil { // Populate task resources if tr, ok := alloc.AllocatedResources.Tasks[b.taskName]; ok { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 1356e75eb2f..18c60fbf63a 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -7634,10 +7634,24 @@ func (a *Allocation) CopySkipJob() *Allocation { return a.copyImpl(false) } +// Canonicalize Allocation to ensure fields are initialized to the expectations +// of this version of Nomad. Should be called when restoring persisted +// Allocations or receiving Allocations from Nomad agents potentially on an +// older version of Nomad. func (a *Allocation) Canonicalize() { if a.AllocatedResources == nil && a.TaskResources != nil { ar := AllocatedResources{} - ar.Tasks = toAllocatedResources(a.TaskResources) + + tasks := make(map[string]*AllocatedTaskResources, len(a.TaskResources)) + for name, tr := range a.TaskResources { + atr := AllocatedTaskResources{} + atr.Cpu.CpuShares = int64(tr.CPU) + atr.Memory.MemoryMB = int64(tr.MemoryMB) + atr.Networks = tr.Networks.Copy() + + tasks[name] = &atr + } + ar.Tasks = tasks if a.SharedResources != nil { ar.Shared.DiskMB = int64(a.SharedResources.DiskMB) @@ -7652,21 +7666,6 @@ func (a *Allocation) Canonicalize() { // a.Job.Canonicalize() } -func toAllocatedResources(taskResources map[string]*Resources) map[string]*AllocatedTaskResources { - tasks := make(map[string]*AllocatedTaskResources, len(taskResources)) - - for name, tr := range taskResources { - atr := AllocatedTaskResources{} - atr.Cpu.CpuShares = int64(tr.CPU) - atr.Memory.MemoryMB = int64(tr.MemoryMB) - atr.Networks = tr.Networks.Copy() - - tasks[name] = &atr - } - - return tasks -} - func (a *Allocation) copyImpl(job bool) *Allocation { if a == nil { return nil From 4813863215bab65e493147fc6c9fd34f0a78023f Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Wed, 15 Jan 2020 09:02:48 -0500 Subject: [PATCH 7/9] actually always canonicalize alloc.Job alloc.Job may be stale as well and need to migrate it. It does cost extra cycles but should be negligible. --- client/state/state_database.go | 1 - nomad/structs/structs.go | 4 +--- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/client/state/state_database.go b/client/state/state_database.go index 6d1e65fb297..c513e13da39 100644 --- a/client/state/state_database.go +++ b/client/state/state_database.go @@ -209,7 +209,6 @@ func (s *BoltStateDB) getAllAllocations(tx *boltdd.Tx) ([]*structs.Allocation, m // Handle upgrade path ae.Alloc.Canonicalize() - ae.Alloc.Job.Canonicalize() allocs = append(allocs, ae.Alloc) } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 18c60fbf63a..9938b938b9d 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -7661,9 +7661,7 @@ func (a *Allocation) Canonicalize() { a.AllocatedResources = &ar } - // TODO: Investigate if we should canonicalize the job - // it may be out of sync with respect to the original job - // a.Job.Canonicalize() + a.Job.Canonicalize() } func (a *Allocation) copyImpl(job bool) *Allocation { From 438f98c0f30ca6628773a215f81794fcd63e2686 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 28 Jan 2020 09:59:05 -0500 Subject: [PATCH 8/9] client: canonicalize alloc.Job on restore There is a case for always canonicalizing alloc.Job field when canonicalizing the alloc. I'm less certain of implications though, and the job canonicalize hasn't changed for a long time. Here, we special case client restore from database as it's probably the most relevant part. When receiving an alloc from RPC, the data should be fresh enough. --- client/state/state_database.go | 1 + 1 file changed, 1 insertion(+) diff --git a/client/state/state_database.go b/client/state/state_database.go index c513e13da39..6d1e65fb297 100644 --- a/client/state/state_database.go +++ b/client/state/state_database.go @@ -209,6 +209,7 @@ func (s *BoltStateDB) getAllAllocations(tx *boltdd.Tx) ([]*structs.Allocation, m // Handle upgrade path ae.Alloc.Canonicalize() + ae.Alloc.Job.Canonicalize() allocs = append(allocs, ae.Alloc) } From 99bc650ace0b448ad3eafe619de03022af74cc69 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 28 Jan 2020 14:58:57 -0500 Subject: [PATCH 9/9] tests: run_for is already a string --- client/state/db_test.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/client/state/db_test.go b/client/state/db_test.go index 230c7ad869e..c37ed8bd27c 100644 --- a/client/state/db_test.go +++ b/client/state/db_test.go @@ -76,12 +76,6 @@ func TestStateDB_Allocations(t *testing.T) { alloc1 := mock.Alloc() alloc2 := mock.BatchAlloc() - //XXX Sadly roundtripping allocs loses time.Duration type - // information from the Config map[string]interface{}. As - // the mock driver itself with unmarshal run_for into the - // proper type, we can safely ignore it here. - delete(alloc2.Job.TaskGroups[0].Tasks[0].Config, "run_for") - require.NoError(db.PutAllocation(alloc1)) require.NoError(db.PutAllocation(alloc2))