Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Image / ImagePullPolicy to Function/Sink/Source Spec #104

Merged
merged 12 commits into from
Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/v1alpha1/function_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ type FunctionSpec struct {

Messaging `json:",inline"`
Runtime `json:",inline"`

// Image is the container image used to run function pods.
// default is streamnative/pulsar-functions-java-runner
Image string `json:"image,omitempty"`

// Image pull policy, one of Always, Never, IfNotPresent, default to Always.
ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"`
}

// FunctionStatus defines the observed state of Function
Expand Down
7 changes: 7 additions & 0 deletions api/v1alpha1/sink_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ type SinkSpec struct {

Messaging `json:",inline"`
Runtime `json:",inline"`

// Image is the container image used to run sink pods.
// default is streamnative/pulsar-functions-java-runner
Image string `json:"image,omitempty"`

// Image pull policy, one of Always, Never, IfNotPresent, default to Always.
ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"`
}

// SinkStatus defines the observed state of Topic
Expand Down
7 changes: 7 additions & 0 deletions api/v1alpha1/source_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ type SourceSpec struct {

Messaging `json:",inline"`
Runtime `json:",inline"`

// Image is the container image used to run source pods.
// default is streamnative/pulsar-functions-java-runner
Image string `json:"image,omitempty"`

// Image pull policy, one of Always, Never, IfNotPresent, default to Always.
ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"`
}

// SourceStatus defines the observed state of Source
Expand Down
9,037 changes: 1,915 additions & 7,122 deletions config/crd/bases/cloud.streamnative.io_functionmeshes.yaml

Large diffs are not rendered by default.

2,727 changes: 616 additions & 2,111 deletions config/crd/bases/cloud.streamnative.io_functions.yaml

Large diffs are not rendered by default.

2,729 changes: 618 additions & 2,111 deletions config/crd/bases/cloud.streamnative.io_sinks.yaml

Large diffs are not rendered by default.

2,729 changes: 618 additions & 2,111 deletions config/crd/bases/cloud.streamnative.io_sources.yaml

Large diffs are not rendered by default.

54 changes: 41 additions & 13 deletions controllers/spec/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ import (
const (
EnvShardID = "SHARD_ID"
FunctionsInstanceClasspath = "pulsar.functions.instance.classpath"
DefaultRunnerImage = "streamnative/pulsar-all:2.7.0-rc-pm-3"
DefaultJavaRunnerImage = "streamnative/pulsar-functions-java-runner:2.7.0"
DefaultPythonRunnerImage = "streamnative/pulsar-functions-python-runner:2.7.0"
DefaultGoRunnerImage = "streamnative/pulsar-functions-go-runner:2.7.0"
DefaultRunnerTag = "2.7.1"
DefaultRunnerPrefix = "streamnative/"
DefaultRunnerImage = DefaultRunnerPrefix + "pulsar-all:" + DefaultRunnerTag
DefaultJavaRunnerImage = DefaultRunnerPrefix + "pulsar-functions-java-runner:" + DefaultRunnerTag
DefaultPythonRunnerImage = DefaultRunnerPrefix + "pulsar-functions-python-runner:" + DefaultRunnerTag
DefaultGoRunnerImage = DefaultRunnerPrefix + "pulsar-functions-go-runner:" + DefaultRunnerTag
PulsarAdminExecutableFile = "/pulsar/bin/pulsar-admin"
PulsarDownloadRootDir = "/pulsar"

Expand Down Expand Up @@ -560,15 +562,41 @@ func generateAnnotations(customAnnotations map[string]string) map[string]string
return annotations
}

func getFunctionRunnerImage(runtime *v1alpha1.Runtime) string {
if runtime != nil {
if runtime.Java != nil && runtime.Java.Jar != "" {
return DefaultJavaRunnerImage
} else if runtime.Python != nil && runtime.Python.Py != "" {
return DefaultPythonRunnerImage
} else if runtime.Golang != nil && runtime.Golang.Go != "" {
return DefaultGoRunnerImage
}
func getFunctionRunnerImage(spec *v1alpha1.FunctionSpec) string {
runtime := &spec.Runtime
img := spec.Image
if img != "" {
return img
} else if runtime.Java != nil && runtime.Java.Jar != "" {
return DefaultJavaRunnerImage
} else if runtime.Python != nil && runtime.Python.Py != "" {
return DefaultPythonRunnerImage
} else if runtime.Golang != nil && runtime.Golang.Go != "" {
return DefaultGoRunnerImage
}
return DefaultRunnerImage
}

func getSinkRunnerImage(spec *v1alpha1.SinkSpec) string {
img := spec.Image
if img != "" {
return img
}
if spec.Runtime.Java.Jar != "" && spec.Runtime.Java.JarLocation != "" &&
hasPackageNamePrefix(spec.Runtime.Java.JarLocation) {
return DefaultJavaRunnerImage
}
return DefaultRunnerImage
}

func getSourceRunnerImage(spec *v1alpha1.SourceSpec) string {
img := spec.Image
if img != "" {
return img
}
if spec.Runtime.Java.Jar != "" && spec.Runtime.Java.JarLocation != "" &&
hasPackageNamePrefix(spec.Runtime.Java.JarLocation) {
return DefaultJavaRunnerImage
}
return DefaultRunnerImage
}
72 changes: 66 additions & 6 deletions controllers/spec/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,24 +89,84 @@ func TestGetDownloadCommand(t *testing.T) {
}

func TestGetFunctionRunnerImage(t *testing.T) {
javaRuntime := &v1alpha1.Runtime{Java: &v1alpha1.JavaRuntime{
javaRuntime := v1alpha1.Runtime{Java: &v1alpha1.JavaRuntime{
Jar: "test.jar",
JarLocation: "test",
}}
image := getFunctionRunnerImage(javaRuntime)
image := getFunctionRunnerImage(&v1alpha1.FunctionSpec{Runtime: javaRuntime})
assert.Equal(t, image, DefaultJavaRunnerImage)

pythonRuntime := &v1alpha1.Runtime{Python: &v1alpha1.PythonRuntime{
pythonRuntime := v1alpha1.Runtime{Python: &v1alpha1.PythonRuntime{
Py: "test.py",
PyLocation: "test",
}}
image = getFunctionRunnerImage(pythonRuntime)
image = getFunctionRunnerImage(&v1alpha1.FunctionSpec{Runtime: pythonRuntime})
assert.Equal(t, image, DefaultPythonRunnerImage)

goRuntime := &v1alpha1.Runtime{Golang: &v1alpha1.GoRuntime{
goRuntime := v1alpha1.Runtime{Golang: &v1alpha1.GoRuntime{
Go: "test",
GoLocation: "test",
}}
image = getFunctionRunnerImage(goRuntime)
image = getFunctionRunnerImage(&v1alpha1.FunctionSpec{Runtime: goRuntime})
assert.Equal(t, image, DefaultGoRunnerImage)
}

func TestGetSinkRunnerImage(t *testing.T) {
spec := v1alpha1.SinkSpec{Runtime: v1alpha1.Runtime{Java: &v1alpha1.JavaRuntime{
Jar: "test.jar",
JarLocation: "",
}}}
image := getSinkRunnerImage(&spec)
assert.Equal(t, image, DefaultRunnerImage)

spec = v1alpha1.SinkSpec{Runtime: v1alpha1.Runtime{Java: &v1alpha1.JavaRuntime{
Jar: "test.jar",
JarLocation: "test",
}}}
image = getSinkRunnerImage(&spec)
assert.Equal(t, image, DefaultRunnerImage)

spec = v1alpha1.SinkSpec{Runtime: v1alpha1.Runtime{Java: &v1alpha1.JavaRuntime{
Jar: "test.jar",
JarLocation: "sink://public/default/test",
}}}
image = getSinkRunnerImage(&spec)
assert.Equal(t, image, DefaultJavaRunnerImage)

spec = v1alpha1.SinkSpec{Runtime: v1alpha1.Runtime{Java: &v1alpha1.JavaRuntime{
Jar: "test.jar",
JarLocation: "",
}}, Image: "streamnative/pulsar-io-test:2.7.1"}
image = getSinkRunnerImage(&spec)
assert.Equal(t, image, "streamnative/pulsar-io-test:2.7.1")
}

func TestGetSourceRunnerImage(t *testing.T) {
spec := v1alpha1.SourceSpec{Runtime: v1alpha1.Runtime{Java: &v1alpha1.JavaRuntime{
Jar: "test.jar",
JarLocation: "",
}}}
image := getSourceRunnerImage(&spec)
assert.Equal(t, image, DefaultRunnerImage)

spec = v1alpha1.SourceSpec{Runtime: v1alpha1.Runtime{Java: &v1alpha1.JavaRuntime{
Jar: "test.jar",
JarLocation: "test",
}}}
image = getSourceRunnerImage(&spec)
assert.Equal(t, image, DefaultRunnerImage)

spec = v1alpha1.SourceSpec{Runtime: v1alpha1.Runtime{Java: &v1alpha1.JavaRuntime{
Jar: "test.jar",
JarLocation: "sink://public/default/test",
}}}
image = getSourceRunnerImage(&spec)
assert.Equal(t, image, DefaultJavaRunnerImage)

spec = v1alpha1.SourceSpec{Runtime: v1alpha1.Runtime{Java: &v1alpha1.JavaRuntime{
Jar: "test.jar",
JarLocation: "",
}}, Image: "streamnative/pulsar-io-test:2.7.1"}
image = getSourceRunnerImage(&spec)
assert.Equal(t, image, "streamnative/pulsar-io-test:2.7.1")
}
8 changes: 6 additions & 2 deletions controllers/spec/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,19 @@ func makeFunctionVolumeMounts(function *v1alpha1.Function) []corev1.VolumeMount
}

func MakeFunctionContainer(function *v1alpha1.Function) *corev1.Container {
imagePullPolicy := function.Spec.ImagePullPolicy
if imagePullPolicy == "" {
imagePullPolicy = corev1.PullIfNotPresent
}
return &corev1.Container{
// TODO new container to pull user code image and upload jars into bookkeeper
Name: "pulsar-function",
Image: getFunctionRunnerImage(&function.Spec.Runtime),
Image: getFunctionRunnerImage(&function.Spec),
Command: makeFunctionCommand(function),
Ports: []corev1.ContainerPort{GRPCPort, MetricsPort},
Env: generateContainerEnv(function.Spec.SecretsMap),
Resources: function.Spec.Resources,
ImagePullPolicy: corev1.PullIfNotPresent,
ImagePullPolicy: imagePullPolicy,
EnvFrom: generateContainerEnvFrom(function.Spec.Pulsar.PulsarConfig, function.Spec.Pulsar.AuthConfig),
VolumeMounts: makeFunctionVolumeMounts(function),
}
Expand Down
8 changes: 6 additions & 2 deletions controllers/spec/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,19 @@ func MakeSinkObjectMeta(sink *v1alpha1.Sink) *metav1.ObjectMeta {
}

func MakeSinkContainer(sink *v1alpha1.Sink) *corev1.Container {
imagePullPolicy := sink.Spec.ImagePullPolicy
if imagePullPolicy == "" {
imagePullPolicy = corev1.PullIfNotPresent
}
return &corev1.Container{
// TODO new container to pull user code image and upload jars into bookkeeper
Name: "pulsar-sink",
Image: DefaultRunnerImage,
Image: getSinkRunnerImage(&sink.Spec),
Command: MakeSinkCommand(sink),
Ports: []corev1.ContainerPort{GRPCPort, MetricsPort},
Env: generateContainerEnv(sink.Spec.SecretsMap),
Resources: sink.Spec.Resources,
ImagePullPolicy: corev1.PullIfNotPresent,
ImagePullPolicy: imagePullPolicy,
EnvFrom: generateContainerEnvFrom(sink.Spec.Pulsar.PulsarConfig, sink.Spec.Pulsar.AuthConfig),
VolumeMounts: makeSinkVolumeMounts(sink),
}
Expand Down
8 changes: 6 additions & 2 deletions controllers/spec/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,19 @@ func MakeSourceObjectMeta(source *v1alpha1.Source) *metav1.ObjectMeta {
}

func MakeSourceContainer(source *v1alpha1.Source) *corev1.Container {
imagePullPolicy := source.Spec.ImagePullPolicy
if imagePullPolicy == "" {
imagePullPolicy = corev1.PullIfNotPresent
}
return &corev1.Container{
// TODO new container to pull user code image and upload jars into bookkeeper
Name: "pulsar-source",
Image: DefaultRunnerImage,
Image: getSourceRunnerImage(&source.Spec),
Command: makeSourceCommand(source),
Ports: []corev1.ContainerPort{GRPCPort, MetricsPort},
Env: generateContainerEnv(source.Spec.SecretsMap),
Resources: source.Spec.Resources,
ImagePullPolicy: corev1.PullIfNotPresent,
ImagePullPolicy: imagePullPolicy,
EnvFrom: generateContainerEnvFrom(source.Spec.Pulsar.PulsarConfig, source.Spec.Pulsar.AuthConfig),
VolumeMounts: makeSourceVolumeMounts(source),
}
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ module github.com/streamnative/function-mesh
go 1.13

require (
github.com/davecgh/go-spew v1.1.1
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v0.1.0
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.4.2
github.com/google/gofuzz v1.1.0
github.com/google/gofuzz v1.1.0 // indirect
github.com/onsi/ginkgo v1.14.2
github.com/onsi/gomega v1.10.4
github.com/pkg/errors v0.9.1 // indirect
Expand All @@ -20,4 +20,5 @@ require (
k8s.io/apimachinery v0.18.6
k8s.io/client-go v0.18.6
sigs.k8s.io/controller-runtime v0.6.2
sigs.k8s.io/controller-tools v0.2.4 // indirect
)
Loading