Skip to content

Commit

Permalink
fix(sysadvisor): make resource updating not blocked by resource server
Browse files Browse the repository at this point in the history
If qrm plugin now listwatch resource server, updating process will be blocked.

Signed-off-by: linzhecheng <linzhecheng@bytedance.com>
  • Loading branch information
cheney-lin committed Jul 8, 2023
1 parent 8bcb35a commit df16ff1
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (o *MemoryAdvisorOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringSliceVar(&o.MemoryHeadroomPolicyPriority, "memory-headroom-policy-priority", o.MemoryHeadroomPolicyPriority,
"policy memory advisor to estimate resource headroom, sorted by priority descending order, should be formatted as 'policy1,policy2'")
o.MemoryHeadroomPolicyOptions.AddFlags(fs)
fs.StringSliceVar(&o.MemoryAdvisorPlugins, "memory-advisor-plguins", o.MemoryAdvisorPlugins,
fs.StringSliceVar(&o.MemoryAdvisorPlugins, "memory-advisor-plugins", o.MemoryAdvisorPlugins,
"memory advisor plugins to use.")
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/advisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,12 @@ func (cra *cpuResourceAdvisor) update() {
klog.Errorf("[qosaware-cpu] assemble provision failed: %v", err)
return
}
cra.sendCh <- calculationResult
klog.Infof("[qosaware-cpu] notify cpu server: %+v", calculationResult)
select {
case cra.sendCh <- calculationResult:
general.Infof("notify cpu server: %+v", calculationResult)
default:
general.Errorf("channel is full")
}
}

// setIsolatedContainers get isolation status from isolator and update into containers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package provisionassembler

import (
"time"

"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpuadvisor"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache"
Expand Down Expand Up @@ -62,6 +64,7 @@ func (pa *ProvisionAssemblerCommon) AssembleProvision() (types.InternalCPUCalcul

calculationResult := types.InternalCPUCalculationResult{
PoolEntries: make(map[string]map[int]int),
TimeStamp: time.Now(),
}

// fill in reserve pool entry
Expand Down
10 changes: 8 additions & 2 deletions pkg/agent/sysadvisor/plugin/qosaware/resource/memory/advisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,19 @@ func (ra *memoryResourceAdvisor) GetHeadroom() (resource.Quantity, error) {

func (ra *memoryResourceAdvisor) sendAdvices() {
// send to server
result := types.InternalMemoryCalculationResult{}
result := types.InternalMemoryCalculationResult{TimeStamp: time.Now()}
for _, plugin := range ra.plugins {
advices := plugin.GetAdvices()
result.ContainerEntries = append(result.ContainerEntries, advices.ContainerEntries...)
result.ExtraEntries = append(result.ExtraEntries, advices.ExtraEntries...)
}
ra.sendChan <- result

select {
case ra.sendChan <- result:
general.Infof("notify memory server: %+v", result)
default:
general.Errorf("channel is full")
}
}

func (ra *memoryResourceAdvisor) update() {
Expand Down
6 changes: 6 additions & 0 deletions pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package server
import (
"context"
"fmt"
"time"

"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -90,6 +91,11 @@ func (cs *cpuServer) ListAndWatch(_ *advisorsvc.Empty, server cpuadvisor.CPUAdvi
klog.Infof("[qosaware-server-cpu] recv channel is closed")
return nil
}
if advisorResp.TimeStamp.Add(cs.period * 2).Before(time.Now()) {
general.Warningf("advisorResp is expired")
continue
}

klog.Infof("[qosaware-server-cpu] get advisor update: %+v", advisorResp)

calculationEntriesMap := make(map[string]*cpuadvisor.CalculationEntries)
Expand Down
39 changes: 23 additions & 16 deletions pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"reflect"
"sort"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -234,10 +235,12 @@ func TestCPUServerListAndWatch(t *testing.T) {
{
name: "reclaim pool with shared pool",
empty: &advisorsvc.Empty{},
provision: types.InternalCPUCalculationResult{PoolEntries: map[string]map[int]int{
state.PoolNameShare: {-1: 2},
state.PoolNameReclaim: {-1: 4},
}},
provision: types.InternalCPUCalculationResult{
TimeStamp: time.Now(),
PoolEntries: map[string]map[int]int{
state.PoolNameShare: {-1: 2},
state.PoolNameReclaim: {-1: 4},
}},
wantErr: false,
wantRes: &cpuadvisor.ListAndWatchResponse{
Entries: map[string]*cpuadvisor.CalculationEntries{
Expand Down Expand Up @@ -279,12 +282,14 @@ func TestCPUServerListAndWatch(t *testing.T) {
{
name: "reclaim pool with dedicated pod",
empty: &advisorsvc.Empty{},
provision: types.InternalCPUCalculationResult{PoolEntries: map[string]map[int]int{
state.PoolNameReclaim: {
0: 4,
1: 8,
},
}},
provision: types.InternalCPUCalculationResult{
TimeStamp: time.Now(),
PoolEntries: map[string]map[int]int{
state.PoolNameReclaim: {
0: 4,
1: 8,
},
}},
infos: []*ContainerInfo{
{
request: &advisorsvc.AddContainerRequest{
Expand Down Expand Up @@ -389,12 +394,14 @@ func TestCPUServerListAndWatch(t *testing.T) {
{
name: "reclaim pool colocated with dedicated pod(2 containers)",
empty: &advisorsvc.Empty{},
provision: types.InternalCPUCalculationResult{PoolEntries: map[string]map[int]int{
state.PoolNameReclaim: {
0: 4,
1: 8,
},
}},
provision: types.InternalCPUCalculationResult{
TimeStamp: time.Now(),
PoolEntries: map[string]map[int]int{
state.PoolNameReclaim: {
0: 4,
1: 8,
},
}},
infos: []*ContainerInfo{
{
request: &advisorsvc.AddContainerRequest{
Expand Down
15 changes: 13 additions & 2 deletions pkg/agent/sysadvisor/plugin/qosaware/server/memory_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package server

import (
"fmt"
"time"

"google.golang.org/grpc"

Expand Down Expand Up @@ -70,11 +71,21 @@ func (ms *memoryServer) ListAndWatch(_ *advisorsvc.Empty, server advisorsvc.Advi
general.Infof("%v recv channel is closed", ms.name)
return nil
}
if advisorResp.TimeStamp.Add(ms.period * 2).Before(time.Now()) {
general.Warningf("advisorResp is expired")
continue
}
resp := ms.assembleResponse(&advisorResp)
if resp != nil {
server.Send(resp)
if err := server.Send(resp); err != nil {
general.Errorf("send response failed: %v", err)
_ = ms.emitter.StoreInt64(ms.genMetricsName(metricServerLWSendResponseFailed), int64(ms.period.Seconds()), metrics.MetricTypeNameCount)
return err
}

general.Infof("send calculation result: %v", general.ToString(resp))
_ = ms.emitter.StoreInt64(ms.genMetricsName(metricServerLWSendResponseSucceeded), int64(ms.period.Seconds()), metrics.MetricTypeNameCount)
}
return nil
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"io/ioutil"
"reflect"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -104,6 +105,7 @@ func TestMemoryServerListAndWatch(t *testing.T) {
name: "normal",
empty: &advisorsvc.Empty{},
provision: types.InternalMemoryCalculationResult{
TimeStamp: time.Now(),
ContainerEntries: []types.ContainerMemoryAdvices{
{
PodUID: "pod1",
Expand Down
3 changes: 3 additions & 0 deletions pkg/agent/sysadvisor/types/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package types

import (
"time"

"k8s.io/apimachinery/pkg/util/sets"

"github.com/kubewharf/katalyst-core/pkg/util/machine"
Expand Down Expand Up @@ -139,6 +141,7 @@ type RegionInfo struct {
// calculation result
type InternalCPUCalculationResult struct {
PoolEntries map[string]map[int]int // map[poolName][numaId]cpuSize
TimeStamp time.Time
}

// ControlEssentials defines essential metrics for cpu advisor feedback control
Expand Down
7 changes: 6 additions & 1 deletion pkg/agent/sysadvisor/types/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ limitations under the License.

package types

import "k8s.io/apimachinery/pkg/api/resource"
import (
"time"

"k8s.io/apimachinery/pkg/api/resource"
)

type MemoryAdvisorPluginName string
type MemoryPressureState int
Expand Down Expand Up @@ -57,4 +61,5 @@ type ExtraMemoryAdvices struct {
type InternalMemoryCalculationResult struct {
ContainerEntries []ContainerMemoryAdvices
ExtraEntries []ExtraMemoryAdvices
TimeStamp time.Time
}

0 comments on commit df16ff1

Please sign in to comment.