Skip to content

Commit

Permalink
add Image / ImagePullPolicy to Function/Sink/Source Spec (#104)
Browse files Browse the repository at this point in the history
* bump to runner 2.7.1

* bump vendor

* stage

* allow user defined runner image

* lint style

* sink runner image & source runner image will use DefaultRunnerImage by default

* update crds

* apply DefaultJavaRunnerImage if connector with packages location

* add tests

* fix ci and align crd model gen to k8s 1.14

* bump kind k8s to 1.15.12

* revert type changes
  • Loading branch information
freeznet authored Apr 14, 2021
1 parent a023a32 commit 0750f8b
Show file tree
Hide file tree
Showing 209 changed files with 46,686 additions and 21,588 deletions.
7 changes: 7 additions & 0 deletions api/v1alpha1/function_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ type FunctionSpec struct {

Messaging `json:",inline"`
Runtime `json:",inline"`

// Image is the container image used to run function pods.
// default is streamnative/pulsar-functions-java-runner
Image string `json:"image,omitempty"`

// Image pull policy, one of Always, Never, IfNotPresent, default to Always.
ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"`
}

// FunctionStatus defines the observed state of Function
Expand Down
7 changes: 7 additions & 0 deletions api/v1alpha1/sink_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ type SinkSpec struct {

Messaging `json:",inline"`
Runtime `json:",inline"`

// Image is the container image used to run sink pods.
// default is streamnative/pulsar-functions-java-runner
Image string `json:"image,omitempty"`

// Image pull policy, one of Always, Never, IfNotPresent, default to Always.
ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"`
}

// SinkStatus defines the observed state of Topic
Expand Down
7 changes: 7 additions & 0 deletions api/v1alpha1/source_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ type SourceSpec struct {

Messaging `json:",inline"`
Runtime `json:",inline"`

// Image is the container image used to run source pods.
// default is streamnative/pulsar-functions-java-runner
Image string `json:"image,omitempty"`

// Image pull policy, one of Always, Never, IfNotPresent, default to Always.
ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"`
}

// SourceStatus defines the observed state of Source
Expand Down
9,037 changes: 1,915 additions & 7,122 deletions config/crd/bases/cloud.streamnative.io_functionmeshes.yaml

Large diffs are not rendered by default.

2,727 changes: 616 additions & 2,111 deletions config/crd/bases/cloud.streamnative.io_functions.yaml

Large diffs are not rendered by default.

2,729 changes: 618 additions & 2,111 deletions config/crd/bases/cloud.streamnative.io_sinks.yaml

Large diffs are not rendered by default.

2,729 changes: 618 additions & 2,111 deletions config/crd/bases/cloud.streamnative.io_sources.yaml

Large diffs are not rendered by default.

54 changes: 41 additions & 13 deletions controllers/spec/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ import (
const (
EnvShardID = "SHARD_ID"
FunctionsInstanceClasspath = "pulsar.functions.instance.classpath"
DefaultRunnerImage = "streamnative/pulsar-all:2.7.0-rc-pm-3"
DefaultJavaRunnerImage = "streamnative/pulsar-functions-java-runner:2.7.0"
DefaultPythonRunnerImage = "streamnative/pulsar-functions-python-runner:2.7.0"
DefaultGoRunnerImage = "streamnative/pulsar-functions-go-runner:2.7.0"
DefaultRunnerTag = "2.7.1"
DefaultRunnerPrefix = "streamnative/"
DefaultRunnerImage = DefaultRunnerPrefix + "pulsar-all:" + DefaultRunnerTag
DefaultJavaRunnerImage = DefaultRunnerPrefix + "pulsar-functions-java-runner:" + DefaultRunnerTag
DefaultPythonRunnerImage = DefaultRunnerPrefix + "pulsar-functions-python-runner:" + DefaultRunnerTag
DefaultGoRunnerImage = DefaultRunnerPrefix + "pulsar-functions-go-runner:" + DefaultRunnerTag
PulsarAdminExecutableFile = "/pulsar/bin/pulsar-admin"
PulsarDownloadRootDir = "/pulsar"

Expand Down Expand Up @@ -560,15 +562,41 @@ func generateAnnotations(customAnnotations map[string]string) map[string]string
return annotations
}

func getFunctionRunnerImage(runtime *v1alpha1.Runtime) string {
if runtime != nil {
if runtime.Java != nil && runtime.Java.Jar != "" {
return DefaultJavaRunnerImage
} else if runtime.Python != nil && runtime.Python.Py != "" {
return DefaultPythonRunnerImage
} else if runtime.Golang != nil && runtime.Golang.Go != "" {
return DefaultGoRunnerImage
}
func getFunctionRunnerImage(spec *v1alpha1.FunctionSpec) string {
runtime := &spec.Runtime
img := spec.Image
if img != "" {
return img
} else if runtime.Java != nil && runtime.Java.Jar != "" {
return DefaultJavaRunnerImage
} else if runtime.Python != nil && runtime.Python.Py != "" {
return DefaultPythonRunnerImage
} else if runtime.Golang != nil && runtime.Golang.Go != "" {
return DefaultGoRunnerImage
}
return DefaultRunnerImage
}

func getSinkRunnerImage(spec *v1alpha1.SinkSpec) string {
img := spec.Image
if img != "" {
return img
}
if spec.Runtime.Java.Jar != "" && spec.Runtime.Java.JarLocation != "" &&
hasPackageNamePrefix(spec.Runtime.Java.JarLocation) {
return DefaultJavaRunnerImage
}
return DefaultRunnerImage
}

func getSourceRunnerImage(spec *v1alpha1.SourceSpec) string {
img := spec.Image
if img != "" {
return img
}
if spec.Runtime.Java.Jar != "" && spec.Runtime.Java.JarLocation != "" &&
hasPackageNamePrefix(spec.Runtime.Java.JarLocation) {
return DefaultJavaRunnerImage
}
return DefaultRunnerImage
}
72 changes: 66 additions & 6 deletions controllers/spec/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,24 +89,84 @@ func TestGetDownloadCommand(t *testing.T) {
}

func TestGetFunctionRunnerImage(t *testing.T) {
javaRuntime := &v1alpha1.Runtime{Java: &v1alpha1.JavaRuntime{
javaRuntime := v1alpha1.Runtime{Java: &v1alpha1.JavaRuntime{
Jar: "test.jar",
JarLocation: "test",
}}
image := getFunctionRunnerImage(javaRuntime)
image := getFunctionRunnerImage(&v1alpha1.FunctionSpec{Runtime: javaRuntime})
assert.Equal(t, image, DefaultJavaRunnerImage)

pythonRuntime := &v1alpha1.Runtime{Python: &v1alpha1.PythonRuntime{
pythonRuntime := v1alpha1.Runtime{Python: &v1alpha1.PythonRuntime{
Py: "test.py",
PyLocation: "test",
}}
image = getFunctionRunnerImage(pythonRuntime)
image = getFunctionRunnerImage(&v1alpha1.FunctionSpec{Runtime: pythonRuntime})
assert.Equal(t, image, DefaultPythonRunnerImage)

goRuntime := &v1alpha1.Runtime{Golang: &v1alpha1.GoRuntime{
goRuntime := v1alpha1.Runtime{Golang: &v1alpha1.GoRuntime{
Go: "test",
GoLocation: "test",
}}
image = getFunctionRunnerImage(goRuntime)
image = getFunctionRunnerImage(&v1alpha1.FunctionSpec{Runtime: goRuntime})
assert.Equal(t, image, DefaultGoRunnerImage)
}

func TestGetSinkRunnerImage(t *testing.T) {
spec := v1alpha1.SinkSpec{Runtime: v1alpha1.Runtime{Java: &v1alpha1.JavaRuntime{
Jar: "test.jar",
JarLocation: "",
}}}
image := getSinkRunnerImage(&spec)
assert.Equal(t, image, DefaultRunnerImage)

spec = v1alpha1.SinkSpec{Runtime: v1alpha1.Runtime{Java: &v1alpha1.JavaRuntime{
Jar: "test.jar",
JarLocation: "test",
}}}
image = getSinkRunnerImage(&spec)
assert.Equal(t, image, DefaultRunnerImage)

spec = v1alpha1.SinkSpec{Runtime: v1alpha1.Runtime{Java: &v1alpha1.JavaRuntime{
Jar: "test.jar",
JarLocation: "sink://public/default/test",
}}}
image = getSinkRunnerImage(&spec)
assert.Equal(t, image, DefaultJavaRunnerImage)

spec = v1alpha1.SinkSpec{Runtime: v1alpha1.Runtime{Java: &v1alpha1.JavaRuntime{
Jar: "test.jar",
JarLocation: "",
}}, Image: "streamnative/pulsar-io-test:2.7.1"}
image = getSinkRunnerImage(&spec)
assert.Equal(t, image, "streamnative/pulsar-io-test:2.7.1")
}

func TestGetSourceRunnerImage(t *testing.T) {
spec := v1alpha1.SourceSpec{Runtime: v1alpha1.Runtime{Java: &v1alpha1.JavaRuntime{
Jar: "test.jar",
JarLocation: "",
}}}
image := getSourceRunnerImage(&spec)
assert.Equal(t, image, DefaultRunnerImage)

spec = v1alpha1.SourceSpec{Runtime: v1alpha1.Runtime{Java: &v1alpha1.JavaRuntime{
Jar: "test.jar",
JarLocation: "test",
}}}
image = getSourceRunnerImage(&spec)
assert.Equal(t, image, DefaultRunnerImage)

spec = v1alpha1.SourceSpec{Runtime: v1alpha1.Runtime{Java: &v1alpha1.JavaRuntime{
Jar: "test.jar",
JarLocation: "sink://public/default/test",
}}}
image = getSourceRunnerImage(&spec)
assert.Equal(t, image, DefaultJavaRunnerImage)

spec = v1alpha1.SourceSpec{Runtime: v1alpha1.Runtime{Java: &v1alpha1.JavaRuntime{
Jar: "test.jar",
JarLocation: "",
}}, Image: "streamnative/pulsar-io-test:2.7.1"}
image = getSourceRunnerImage(&spec)
assert.Equal(t, image, "streamnative/pulsar-io-test:2.7.1")
}
8 changes: 6 additions & 2 deletions controllers/spec/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,19 @@ func makeFunctionVolumeMounts(function *v1alpha1.Function) []corev1.VolumeMount
}

func MakeFunctionContainer(function *v1alpha1.Function) *corev1.Container {
imagePullPolicy := function.Spec.ImagePullPolicy
if imagePullPolicy == "" {
imagePullPolicy = corev1.PullIfNotPresent
}
return &corev1.Container{
// TODO new container to pull user code image and upload jars into bookkeeper
Name: "pulsar-function",
Image: getFunctionRunnerImage(&function.Spec.Runtime),
Image: getFunctionRunnerImage(&function.Spec),
Command: makeFunctionCommand(function),
Ports: []corev1.ContainerPort{GRPCPort, MetricsPort},
Env: generateContainerEnv(function.Spec.SecretsMap),
Resources: function.Spec.Resources,
ImagePullPolicy: corev1.PullIfNotPresent,
ImagePullPolicy: imagePullPolicy,
EnvFrom: generateContainerEnvFrom(function.Spec.Pulsar.PulsarConfig, function.Spec.Pulsar.AuthConfig),
VolumeMounts: makeFunctionVolumeMounts(function),
}
Expand Down
8 changes: 6 additions & 2 deletions controllers/spec/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,19 @@ func MakeSinkObjectMeta(sink *v1alpha1.Sink) *metav1.ObjectMeta {
}

func MakeSinkContainer(sink *v1alpha1.Sink) *corev1.Container {
imagePullPolicy := sink.Spec.ImagePullPolicy
if imagePullPolicy == "" {
imagePullPolicy = corev1.PullIfNotPresent
}
return &corev1.Container{
// TODO new container to pull user code image and upload jars into bookkeeper
Name: "pulsar-sink",
Image: DefaultRunnerImage,
Image: getSinkRunnerImage(&sink.Spec),
Command: MakeSinkCommand(sink),
Ports: []corev1.ContainerPort{GRPCPort, MetricsPort},
Env: generateContainerEnv(sink.Spec.SecretsMap),
Resources: sink.Spec.Resources,
ImagePullPolicy: corev1.PullIfNotPresent,
ImagePullPolicy: imagePullPolicy,
EnvFrom: generateContainerEnvFrom(sink.Spec.Pulsar.PulsarConfig, sink.Spec.Pulsar.AuthConfig),
VolumeMounts: makeSinkVolumeMounts(sink),
}
Expand Down
8 changes: 6 additions & 2 deletions controllers/spec/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,19 @@ func MakeSourceObjectMeta(source *v1alpha1.Source) *metav1.ObjectMeta {
}

func MakeSourceContainer(source *v1alpha1.Source) *corev1.Container {
imagePullPolicy := source.Spec.ImagePullPolicy
if imagePullPolicy == "" {
imagePullPolicy = corev1.PullIfNotPresent
}
return &corev1.Container{
// TODO new container to pull user code image and upload jars into bookkeeper
Name: "pulsar-source",
Image: DefaultRunnerImage,
Image: getSourceRunnerImage(&source.Spec),
Command: makeSourceCommand(source),
Ports: []corev1.ContainerPort{GRPCPort, MetricsPort},
Env: generateContainerEnv(source.Spec.SecretsMap),
Resources: source.Spec.Resources,
ImagePullPolicy: corev1.PullIfNotPresent,
ImagePullPolicy: imagePullPolicy,
EnvFrom: generateContainerEnvFrom(source.Spec.Pulsar.PulsarConfig, source.Spec.Pulsar.AuthConfig),
VolumeMounts: makeSourceVolumeMounts(source),
}
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ module github.com/streamnative/function-mesh
go 1.13

require (
github.com/davecgh/go-spew v1.1.1
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v0.1.0
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.4.2
github.com/google/gofuzz v1.1.0
github.com/google/gofuzz v1.1.0 // indirect
github.com/onsi/ginkgo v1.14.2
github.com/onsi/gomega v1.10.4
github.com/pkg/errors v0.9.1 // indirect
Expand All @@ -20,4 +20,5 @@ require (
k8s.io/apimachinery v0.18.6
k8s.io/client-go v0.18.6
sigs.k8s.io/controller-runtime v0.6.2
sigs.k8s.io/controller-tools v0.2.4 // indirect
)
Loading

0 comments on commit 0750f8b

Please sign in to comment.