Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Use ToK8sResourceRequirements from flyteplugins
Browse files Browse the repository at this point in the history
  • Loading branch information
bstadlbauer committed Dec 11, 2022
1 parent 29db28d commit 72f18f0
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 146 deletions.
4 changes: 2 additions & 2 deletions pkg/compiler/transformers/k8s/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flytepropeller/pkg/compiler/common"
"github.com/flyteorg/flytepropeller/pkg/compiler/errors"
"github.com/flyteorg/flytepropeller/pkg/utils"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s"
"github.com/go-test/deep"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -50,7 +50,7 @@ func buildNodeSpec(n *core.Node, tasks []*core.CompiledTask, errs errors.Compile
}
}

res, err := utils.ToK8sResourceRequirements(resources)
res, err := flytek8s.ToK8sResourceRequirements(resources)
if err != nil {
errs.Collect(errors.NewWorkflowBuildError(err))
return nil, false
Expand Down
55 changes: 0 additions & 55 deletions pkg/utils/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/client-go/kubernetes"
Expand All @@ -42,60 +41,6 @@ func ToK8sEnvVar(env []*core.KeyValuePair) []v1.EnvVar {
return envVars
}

// TODO we should modify the container resources to contain a map of enum values?
// Also we should probably create tolerations / taints, but we could do that as a post process
func ToK8sResourceList(resources []*core.Resources_ResourceEntry) (v1.ResourceList, error) {
k8sResources := make(v1.ResourceList, len(resources))
for _, r := range resources {
rVal := r.Value
v, err := resource.ParseQuantity(rVal)
if err != nil {
return nil, errors.Wrap(err, "Failed to parse resource as a valid quantity.")
}
switch r.Name {
case core.Resources_CPU:
if !v.IsZero() {
k8sResources[v1.ResourceCPU] = v
}
case core.Resources_MEMORY:
if !v.IsZero() {
k8sResources[v1.ResourceMemory] = v
}
case core.Resources_STORAGE:
if !v.IsZero() {
k8sResources[v1.ResourceStorage] = v
}
case core.Resources_GPU:
if !v.IsZero() {
k8sResources[ResourceNvidiaGPU] = v
}
case core.Resources_EPHEMERAL_STORAGE:
if !v.IsZero() {
k8sResources[v1.ResourceEphemeralStorage] = v
}
}
}
return k8sResources, nil
}

func ToK8sResourceRequirements(resources *core.Resources) (*v1.ResourceRequirements, error) {
res := &v1.ResourceRequirements{}
if resources == nil {
return res, nil
}
req, err := ToK8sResourceList(resources.Requests)
if err != nil {
return res, err
}
lim, err := ToK8sResourceList(resources.Limits)
if err != nil {
return res, err
}
res.Limits = lim
res.Requests = req
return res, nil
}

// GetContainer searches the provided pod spec for a container with the specified name
func GetContainer(pod *v1.PodSpec, containerName string) (*v1.Container, error) {
for i := 0; i < len(pod.Containers); i++ {
Expand Down
89 changes: 0 additions & 89 deletions pkg/utils/k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,95 +30,6 @@ func TestToK8sEnvVar(t *testing.T) {
assert.Empty(t, e)
}

func TestToK8sResourceList(t *testing.T) {
{
r, err := ToK8sResourceList([]*core.Resources_ResourceEntry{
{Name: core.Resources_CPU, Value: "250m"},
{Name: core.Resources_GPU, Value: "1"},
{Name: core.Resources_MEMORY, Value: "1024Mi"},
{Name: core.Resources_STORAGE, Value: "1024Mi"},
{Name: core.Resources_EPHEMERAL_STORAGE, Value: "1024Mi"},
})

assert.NoError(t, err)
assert.NotEmpty(t, r)
assert.NotNil(t, r[v1.ResourceCPU])
assert.Equal(t, resource.MustParse("250m"), r[v1.ResourceCPU])
assert.Equal(t, resource.MustParse("1"), r[ResourceNvidiaGPU])
assert.Equal(t, resource.MustParse("1024Mi"), r[v1.ResourceMemory])
assert.Equal(t, resource.MustParse("1024Mi"), r[v1.ResourceStorage])
assert.Equal(t, resource.MustParse("1024Mi"), r[v1.ResourceEphemeralStorage])
}
{
r, err := ToK8sResourceList([]*core.Resources_ResourceEntry{})
assert.NoError(t, err)
assert.Empty(t, r)
}
{
_, err := ToK8sResourceList([]*core.Resources_ResourceEntry{
{Name: core.Resources_CPU, Value: "250x"},
})
assert.Error(t, err)
}

}

func TestToK8sResourceRequirements(t *testing.T) {

{
r, err := ToK8sResourceRequirements(nil)
assert.NoError(t, err)
assert.NotNil(t, r)
assert.Empty(t, r.Limits)
assert.Empty(t, r.Requests)
}
{
r, err := ToK8sResourceRequirements(&core.Resources{
Requests: nil,
Limits: nil,
})
assert.NoError(t, err)
assert.NotNil(t, r)
assert.Empty(t, r.Limits)
assert.Empty(t, r.Requests)
}
{
r, err := ToK8sResourceRequirements(&core.Resources{
Requests: []*core.Resources_ResourceEntry{
{Name: core.Resources_CPU, Value: "250m"},
},
Limits: []*core.Resources_ResourceEntry{
{Name: core.Resources_CPU, Value: "1024m"},
},
})
assert.NoError(t, err)
assert.NotNil(t, r)
assert.Equal(t, resource.MustParse("250m"), r.Requests[v1.ResourceCPU])
assert.Equal(t, resource.MustParse("1024m"), r.Limits[v1.ResourceCPU])
}
{
_, err := ToK8sResourceRequirements(&core.Resources{
Requests: []*core.Resources_ResourceEntry{
{Name: core.Resources_CPU, Value: "blah"},
},
Limits: []*core.Resources_ResourceEntry{
{Name: core.Resources_CPU, Value: "1024m"},
},
})
assert.Error(t, err)
}
{
_, err := ToK8sResourceRequirements(&core.Resources{
Requests: []*core.Resources_ResourceEntry{
{Name: core.Resources_CPU, Value: "250m"},
},
Limits: []*core.Resources_ResourceEntry{
{Name: core.Resources_CPU, Value: "blah"},
},
})
assert.Error(t, err)
}
}

func TestGetProtoTime(t *testing.T) {
assert.NotNil(t, GetProtoTime(nil))
Expand Down

0 comments on commit 72f18f0

Please sign in to comment.