Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add model ouput to kcmas #423

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions pkg/agent/sysadvisor/plugin/inference/inference.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher"
borweinfetcher "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher/borwein"
"github.com/kubewharf/katalyst-core/pkg/config"
metricemitter "github.com/kubewharf/katalyst-core/pkg/config/agent/sysadvisor/metric-emitter"
"github.com/kubewharf/katalyst-core/pkg/config/generic"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metrics"
metricspool "github.com/kubewharf/katalyst-core/pkg/metrics/metrics-pool"
Expand All @@ -47,8 +49,11 @@ type InferencePlugin struct {
period time.Duration
modelsResultFetchers map[string]modelresultfetcher.ModelResultFetcher

metaServer *metaserver.MetaServer
emitter metrics.MetricEmitter
metaServer *metaserver.MetaServer
metricEmitter metrics.MetricEmitter

emitterConf *metricemitter.MetricEmitterPluginConfiguration
qosConf *generic.QoSConfiguration

metaReader metacache.MetaReader
metaWriter metacache.MetaWriter
Expand All @@ -57,7 +62,6 @@ type InferencePlugin struct {
func NewInferencePlugin(pluginName string, conf *config.Configuration, extraConf interface{},
emitterPool metricspool.MetricsEmitterPool, metaServer *metaserver.MetaServer,
metaCache metacache.MetaCache) (plugin.SysAdvisorPlugin, error) {

if conf == nil || conf.InferencePluginConfiguration == nil {
return nil, fmt.Errorf("nil conf")
} else if metaServer == nil {
Expand All @@ -68,14 +72,16 @@ func NewInferencePlugin(pluginName string, conf *config.Configuration, extraConf
return nil, fmt.Errorf("nil emitterPool")
}

emitter := emitterPool.GetDefaultMetricsEmitter().WithTags("advisor-inference")
metricEmitter := emitterPool.GetDefaultMetricsEmitter().WithTags("advisor-inference")

inferencePlugin := InferencePlugin{
name: pluginName,
period: conf.InferencePluginConfiguration.SyncPeriod,
modelsResultFetchers: make(map[string]modelresultfetcher.ModelResultFetcher),
metaServer: metaServer,
emitter: emitter,
metricEmitter: metricEmitter,
emitterConf: conf.AgentConfiguration.MetricEmitterPluginConfiguration,
qosConf: conf.GenericConfiguration.QoSConfiguration,
metaReader: metaCache,
metaWriter: metaCache,
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/sysadvisor/plugin/inference/inference_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestInferencePlugin_Run(t *testing.T) {
period: tt.fields.period,
modelsResultFetchers: tt.fields.modelsResultFetchers,
metaServer: tt.fields.metaServer,
emitter: tt.fields.emitter,
metricEmitter: tt.fields.emitter,
metaReader: tt.fields.metaReader,
metaWriter: tt.fields.metaWriter,
}
Expand Down Expand Up @@ -171,7 +171,7 @@ func TestInferencePlugin_Name(t *testing.T) {
period: tt.fields.period,
modelsResultFetchers: tt.fields.modelsResultFetchers,
metaServer: tt.fields.metaServer,
emitter: tt.fields.emitter,
metricEmitter: tt.fields.emitter,
metaReader: tt.fields.metaReader,
metaWriter: tt.fields.metaWriter,
}
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestInferencePlugin_Init(t *testing.T) {
period: tt.fields.period,
modelsResultFetchers: tt.fields.modelsResultFetchers,
metaServer: tt.fields.metaServer,
emitter: tt.fields.emitter,
metricEmitter: tt.fields.emitter,
metaReader: tt.fields.metaReader,
metaWriter: tt.fields.metaWriter,
}
Expand Down Expand Up @@ -269,7 +269,7 @@ func TestInferencePlugin_fetchModelResult(t *testing.T) {
period: tt.fields.period,
modelsResultFetchers: tt.fields.modelsResultFetchers,
metaServer: tt.fields.metaServer,
emitter: tt.fields.emitter,
metricEmitter: tt.fields.emitter,
metaReader: tt.fields.metaReader,
metaWriter: tt.fields.metaWriter,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,62 +187,59 @@ func (bmrf *BorweinModelResultFetcher) FetchModelResult(ctx context.Context, met
_ = bmrf.emitter.StoreInt64(metricSetInferenceResultFailed, 1, metrics.MetricTypeNameRaw)
return fmt.Errorf("SetInferenceResult failed with error: %v", err)
}

return nil
}

func (bmrf *BorweinModelResultFetcher) parseInferenceRespForPods(requestContainers []*types.ContainerInfo,
resp *borweininfsvc.InferenceResponse) (borweintypes.BorweinInferenceResults, error) {
resp *borweininfsvc.InferenceResponse) (*borweintypes.BorweinInferenceResults, error) {

if resp == nil || resp.PodResponseEntries == nil {
return nil, fmt.Errorf("nil resp")
}

results := make(borweintypes.BorweinInferenceResults)
results := borweintypes.NewBorweinInferenceResults()
// Typically the time diff between "call inference" and "get results" could be ignored.
results.Timestamp = time.Now().UnixMilli()
respContainersCnt := 0

for podUID, containerEntries := range resp.PodResponseEntries {
if containerEntries == nil || len(containerEntries.ContainerInferenceResults) == 0 {
return nil, fmt.Errorf("invalid containerEntries for pod: %s", podUID)
}

results[podUID] = make(map[string][]*borweininfsvc.InferenceResult)

for containerName, cResults := range containerEntries.ContainerInferenceResults {
if cResults == nil {
return nil, fmt.Errorf("invalid result for pod: %s, container: %s", podUID, containerName)
}

results[podUID][containerName] = make([]*borweininfsvc.InferenceResult, len(cResults.InferenceResults))

inferenceResults := make([]*borweininfsvc.InferenceResult, len(cResults.InferenceResults))
for idx, result := range cResults.InferenceResults {
if result == nil {
continue
}

results[podUID][containerName][idx] = proto.Clone(result).(*borweininfsvc.InferenceResult)
inferenceResults[idx] = proto.Clone(result).(*borweininfsvc.InferenceResult)
}
// TODO? should check inference results here?

results.SetInferenceResults(podUID, containerName, inferenceResults...)
}

respContainersCnt += len(results[podUID])
respContainersCnt += len(results.Results[podUID])
}

overloadCnt := 0.0
for _, podContainerResults := range results {
for _, containerResults := range podContainerResults {
for _, containerResult := range containerResults {
switch containerResult.InferenceType {
case borweininfsvc.InferenceType_ClassificationOverload:
if containerResult.IsDefault {
continue
}
if containerResult.Output > containerResult.Percentile {
overloadCnt += 1.0
}
}
results.RangeInferenceResults(func(_, _ string, result *borweininfsvc.InferenceResult) {
switch result.InferenceType {
case borweininfsvc.InferenceType_ClassificationOverload:
if result.IsDefault {
return
}
if result.Output > result.Percentile {
overloadCnt += 1.0
}
}
}
})

if len(requestContainers) > 0 {
_ = bmrf.emitter.StoreFloat64(metricInferenceResponseRatio, float64(respContainersCnt)/float64(len(requestContainers)), metrics.MetricTypeNameRaw)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"
"google.golang.org/grpc"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -37,6 +36,8 @@ import (
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1"

"github.com/stretchr/testify/require"

internalfake "github.com/kubewharf/katalyst-api/pkg/client/clientset/versioned/fake"
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache"
Expand Down Expand Up @@ -429,7 +430,7 @@ func TestBorweinModelResultFetcher_parseInferenceRespForPods(t *testing.T) {
name string
fields fields
args args
want borweintypes.BorweinInferenceResults
want *borweintypes.BorweinInferenceResults
wantErr bool
}{
{
Expand Down Expand Up @@ -459,11 +460,13 @@ func TestBorweinModelResultFetcher_parseInferenceRespForPods(t *testing.T) {
},
},
},
want: borweintypes.BorweinInferenceResults{
podUID: map[string][]*borweininfsvc.InferenceResult{
containerName: {
{
IsDefault: true,
want: &borweintypes.BorweinInferenceResults{
Results: map[string]map[string][]*borweininfsvc.InferenceResult{
podUID: {
containerName: {
{
IsDefault: true,
},
},
},
},
Expand All @@ -485,6 +488,7 @@ func TestBorweinModelResultFetcher_parseInferenceRespForPods(t *testing.T) {
t.Errorf("BorweinModelResultFetcher.parseInferenceRespForPods() error = %v, wantErr %v", err, tt.wantErr)
return
}
got.Timestamp = 0
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("BorweinModelResultFetcher.parseInferenceRespForPods() = %v, want %v", got, tt.want)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,40 @@ type BorweinParameter struct {
}

// BorweinInferenceResults is a descriptor for borwein inference results.
// the first key is the pod UID
// the second key is the container name
// the value array holds the all inference result for a container.
type BorweinInferenceResults map[string]map[string][]*borweininfsvc.InferenceResult
type BorweinInferenceResults struct {
Timestamp int64 // milli second
// the first key is the pod UID
// the second key is the container name
// the value array holds the all inference result for a container.
Results map[string]map[string][]*borweininfsvc.InferenceResult
}

func NewBorweinInferenceResults() *BorweinInferenceResults {
return &BorweinInferenceResults{
Timestamp: 0,
Results: map[string]map[string][]*borweininfsvc.InferenceResult{},
}
}

func (bir *BorweinInferenceResults) SetInferenceResults(podUID string, containerName string, results ...*borweininfsvc.InferenceResult) {
podContainerResults, ok := bir.Results[podUID]
if !ok {
podContainerResults = make(map[string][]*borweininfsvc.InferenceResult)
bir.Results[podUID] = podContainerResults
}

podContainerResults[containerName] = make([]*borweininfsvc.InferenceResult, len(results))
for idx, r := range results {
podContainerResults[containerName][idx] = r
}
}

func (bir *BorweinInferenceResults) RangeInferenceResults(fn func(podUID, containerName string, result *borweininfsvc.InferenceResult)) {
for podUID, podContainerResults := range bir.Results {
for containerName, containerResults := range podContainerResults {
for _, result := range containerResults {
fn(podUID, containerName, result)
}
}
}
}
92 changes: 92 additions & 0 deletions pkg/agent/sysadvisor/plugin/metric-emitter/syncer/pod/model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
Copyright 2022 The Katalyst Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package pod

import (
"context"
"fmt"

"k8s.io/klog/v2"

borweinconsts "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/consts"
borweininfsvc "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/inferencesvc"
borweintypes "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/types"
"github.com/kubewharf/katalyst-core/pkg/custom-metric/store/data"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
)

var modelOutputToEmit []string = []string{
borweinconsts.ModelNameBorwein,
}

// modelMetric emit pod_level model inference to kcmas.
func (p *MetricSyncerPod) modelMetric() {
for _, modelName := range modelOutputToEmit {
results, err := p.metaReader.GetInferenceResult(modelName)
if err != nil {
klog.Errorf("failed to get inference results of model(%s)", modelName)
continue
}

switch typedResults := results.(type) {
case *borweintypes.BorweinInferenceResults:
typedResults.RangeInferenceResults(func(podUID, containerName string, result *borweininfsvc.InferenceResult) {
if result == nil || result.IsDefault {
return
}

pod, err := p.metaServer.GetPod(context.Background(), podUID)
if err != nil || !p.metricPod(pod) {
return
}

tags := p.generateMetricTag(pod)
// There are some cases indicate "valid_break_line":
// 1. result.InferenceType == borweininfsvc.InferenceType_ClassificationOverload &&
// result.Output >= result.Percentile.
// 2. ...
// "Is_Default" cases have been filtered above.
validBreakLine := (result.InferenceType == borweininfsvc.InferenceType_ClassificationOverload &&
result.Output >= result.Percentile)

_ = p.dataEmitter.StoreFloat64(podModelInferenceResultBorwein,
float64(result.Output),
metrics.MetricTypeNameRaw,
append(tags,
metrics.MetricTag{
Key: fmt.Sprintf("%s", data.CustomMetricLabelKeyTimestamp),
Val: fmt.Sprintf("%v", typedResults.Timestamp),
},
metrics.MetricTag{
Key: fmt.Sprintf("%s%s", data.CustomMetricLabelSelectorPrefixKey, "inference_type"),
Val: result.InferenceType.String(),
},
metrics.MetricTag{
Key: fmt.Sprintf("%s%s", data.CustomMetricLabelSelectorPrefixKey, "valid_break_line"),
Val: fmt.Sprintf("%v", validBreakLine),
},
)...)
})

default:
klog.Warningf("invalid model result type: %T", typedResults)
}
}

general.InfofV(4, "get model metric for pod")
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import (

const (
podMetricLabelSelectorNodeName = "node_name"

podModelInferenceResultBorwein = "pod_borwein_inference_result"
)

// podRawMetricNameMapping maps the raw metricName (collected from agent.MetricsFetcher)
Expand Down Expand Up @@ -116,6 +118,7 @@ func (p *MetricSyncerPod) Name() string {

func (p *MetricSyncerPod) Run(ctx context.Context) {
p.ctx = ctx
go wait.Until(p.modelMetric, p.emitterConf.PodSyncPeriod, ctx.Done())
go wait.Until(p.syncChanel, p.emitterConf.PodSyncPeriod, ctx.Done())
}

Expand Down
Loading