diff --git a/.github/workflows/test-function-key-based-batcher.yml b/.github/workflows/test-function-key-based-batcher.yml new file mode 100644 index 000000000..f11b39fdc --- /dev/null +++ b/.github/workflows/test-function-key-based-batcher.yml @@ -0,0 +1,56 @@ +name: Precommit - Test Function Kind with Key Based Batcher +on: + pull_request: + branches: + - '*' +jobs: + lint-test: + runs-on: ubuntu-latest + steps: + - name: clean disk + run: | + sudo swapoff -a + sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc + sudo apt clean + docker rmi $(docker images -q) -f + df -h + + - name: Checkout + uses: actions/checkout@v2 + with: + fetch-depth: 0 + ref: ${{ github.event.pull_request.head.sha }} + + - name: Deploy k8s cluster env + run: | + .ci/deploy_pulsar_cluster.sh + + - name: Build runner images + run: | + PULSAR_IMAGE_TAG=2.7.0 PULSAR_IMAGE=apachepulsar/pulsar-all KIND_PUSH=true images/build.sh + + - name: Install operator-sdk + run: | + RELEASE_VERSION=v1.2.0 + curl -LO https://github.com/operator-framework/operator-sdk/releases/download/${RELEASE_VERSION}/operator-sdk-${RELEASE_VERSION}-x86_64-linux-gnu + chmod +x operator-sdk-${RELEASE_VERSION}-x86_64-linux-gnu && sudo mkdir -p /usr/local/bin/ && sudo cp operator-sdk-${RELEASE_VERSION}-x86_64-linux-gnu /usr/local/bin/operator-sdk && rm operator-sdk-${RELEASE_VERSION}-x86_64-linux-gnu + + # - name: Add CRD, controller or webhooks + # run: | + # operator-sdk create api --group cloud --version v1alpha1 --kind Function --resource=true --controller=true + # operator-sdk create webhook --group compute.functionmesh.io --version v1alpha1 --kind Function --defaulting --programmatic-validation + + - name: Deploy function mesh server + run: | + make generate + make install + nohup make run & + + - name: Test Function kind + run: | + kubectl apply -f config/samples/compute_v1alpha1_function_key_based_batcher.yaml + kubectl get all + + - name: Verfy Function Mesh + run: | + .ci/verify_function_mesh.sh java-function-batcher-sample diff --git a/api/v1alpha1/common.go b/api/v1alpha1/common.go index 933192854..7e33bf5e3 100644 --- a/api/v1alpha1/common.go +++ b/api/v1alpha1/common.go @@ -134,6 +134,7 @@ type ProducerConfig struct { MaxPendingMessagesAcrossPartitions int32 `json:"maxPendingMessagesAcrossPartitions,omitempty"` UseThreadLocalProducers bool `json:"useThreadLocalProducers,omitempty"` CryptoConfig *CryptoConfig `json:"cryptoConfig,omitempty"` + BatchBuilder string `json:"batchBuilder,omitempty"` } type CryptoConfig struct { diff --git a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml index cb96081ef..b4b743da8 100644 --- a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml +++ b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml @@ -164,6 +164,8 @@ spec: type: object producerConf: properties: + batchBuilder: + type: string cryptoConfig: properties: consumerCryptoFailureAction: @@ -5865,6 +5867,8 @@ spec: type: object producerConf: properties: + batchBuilder: + type: string cryptoConfig: properties: consumerCryptoFailureAction: diff --git a/config/crd/bases/compute.functionmesh.io_functions.yaml b/config/crd/bases/compute.functionmesh.io_functions.yaml index 232c0452d..6df4dc7d0 100644 --- a/config/crd/bases/compute.functionmesh.io_functions.yaml +++ b/config/crd/bases/compute.functionmesh.io_functions.yaml @@ -164,6 +164,8 @@ spec: type: object producerConf: properties: + batchBuilder: + type: string cryptoConfig: properties: consumerCryptoFailureAction: diff --git a/config/crd/bases/compute.functionmesh.io_sources.yaml b/config/crd/bases/compute.functionmesh.io_sources.yaml index 03c233524..962c34457 100644 --- a/config/crd/bases/compute.functionmesh.io_sources.yaml +++ b/config/crd/bases/compute.functionmesh.io_sources.yaml @@ -76,6 +76,8 @@ spec: type: object producerConf: properties: + batchBuilder: + type: string cryptoConfig: properties: consumerCryptoFailureAction: diff --git a/config/samples/compute_v1alpha1_function_key_based_batcher.yaml b/config/samples/compute_v1alpha1_function_key_based_batcher.yaml new file mode 100644 index 000000000..9419fbdc0 --- /dev/null +++ b/config/samples/compute_v1alpha1_function_key_based_batcher.yaml @@ -0,0 +1,98 @@ +apiVersion: compute.functionmesh.io/v1alpha1 +kind: Function +metadata: + name: java-function-batcher-sample + namespace: default +spec: + className: org.apache.pulsar.functions.api.examples.ExclamationFunction + image: streamnative/pulsar-all:2.7.1 + sourceType: java.lang.String + sinkType: java.lang.String + forwardSourceMessageProperty: true + MaxPendingAsyncRequests: 1000 + replicas: 1 + maxReplicas: 5 + logTopic: persistent://public/default/logging-function-logs + input: + topics: + - persistent://public/default/java-function-input-topic + output: + topic: persistent://public/default/java-function-output-topic + producerConf: + batchBuilder: "KEY_BASED" + resources: + requests: + cpu: "0.1" + memory: 1G + limits: + cpu: "0.2" + memory: 1.1G + # each secret will be loaded ad an env variable from the `path` secret with the `key` in that secret in the name of `name` + secretsMap: + "name": + path: "test-secret" + key: "username" + "pwd": + path: "test-secret" + key: "password" + pulsar: + pulsarConfig: "test-pulsar" + #authConfig: "test-auth" + volumeMounts: + - mountPath: /cache + name: cache-volume + pod: + labels: + "locaction": "mtv" + annotations: + "managed-function": "true" + volumes: + - name: cache-volume + emptyDir: {} + imagePullSecrets: + - name: regcred + initContainers: + - name: init-function + image: busybox:1.28 + command: ['sh', '-c', 'echo The app is running! && sleep 30'] + sidecars: + - name: sidecar-function + image: busybox:1.28 + command: ['sh', '-c', 'echo The app is running! && sleep 30000'] + java: + jar: /pulsar/examples/api-examples.jar + jarLocation: "" + # use package name: + # jarLocation: function://public/default/nul-test-java-function@v1 + # to be delete & use admission hook + clusterName: test-pulsar + autoAck: true +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: test-pulsar +data: + webServiceURL: http://test-pulsar-broker.default.svc.cluster.local:8080 + brokerServiceURL: pulsar://test-pulsar-broker.default.svc.cluster.local:6650 +#--- +#apiVersion: v1 +#kind: ConfigMap +#metadata: +# name: test-auth +#data: +# clientAuthenticationPlugin: "abc" +# clientAuthenticationParameters: "xyz" +# tlsTrustCertsFilePath: "uvw" +# useTls: "true" +# tlsAllowInsecureConnection: "false" +# tlsHostnameVerificationEnable: "true" +--- +apiVersion: v1 +data: + username: YWRtaW4= + password: MWYyZDFlMmU2N2Rm +kind: Secret +metadata: + name: test-secret +type: Opaque diff --git a/controllers/function_controller_test.go b/controllers/function_controller_test.go index 7e051809b..3c25ea1b4 100644 --- a/controllers/function_controller_test.go +++ b/controllers/function_controller_test.go @@ -70,6 +70,18 @@ var _ = Describe("Function Controller (E2E)", func() { }) }) +var _ = Describe("Function Controller (Batcher)", func() { + Context("Function With Batcher Item", func() { + configs := makeSamplePulsarConfig() + function := makeFunctionSampleWithKeyBasedBatcher() + + createFunctionConfigMap(configs) + createFunction(function) + deleteFunction(function) + deleteFunctionConfigMap(configs) + }) +}) + func createFunction(function *v1alpha1.Function) { if function.Status.Conditions == nil { function.Status.Conditions = make(map[v1alpha1.Component]v1alpha1.ResourceCondition) diff --git a/controllers/spec/utils.go b/controllers/spec/utils.go index 9a48b3509..6988b1fad 100644 --- a/controllers/spec/utils.go +++ b/controllers/spec/utils.go @@ -151,6 +151,7 @@ func generateFunctionOutputSpec(function *v1alpha1.Function) *proto.SinkSpec { MaxPendingMessagesAcrossPartitions: function.Spec.Output.ProducerConf.MaxPendingMessagesAcrossPartitions, UseThreadLocalProducers: function.Spec.Output.ProducerConf.UseThreadLocalProducers, CryptoSpec: generateCryptoSpec(function.Spec.Output.ProducerConf.CryptoConfig), + BatchBuilder: function.Spec.Output.ProducerConf.BatchBuilder, } sinkSpec.ProducerSpec = producerConfig @@ -201,6 +202,7 @@ func generateSourceOutputSpec(source *v1alpha1.Source) *proto.SinkSpec { MaxPendingMessagesAcrossPartitions: source.Spec.Output.ProducerConf.MaxPendingMessagesAcrossPartitions, UseThreadLocalProducers: source.Spec.Output.ProducerConf.UseThreadLocalProducers, CryptoSpec: cryptoSpec, + BatchBuilder: source.Spec.Output.ProducerConf.BatchBuilder, } } return &proto.SinkSpec{ diff --git a/controllers/test_utils_test.go b/controllers/test_utils_test.go index d96383612..c1c0b0b6f 100644 --- a/controllers/test_utils_test.go +++ b/controllers/test_utils_test.go @@ -136,6 +136,17 @@ func makeFunctionSampleWithCryptoEnabled() *v1alpha1.Function { return function } +func makeFunctionSampleWithKeyBasedBatcher() *v1alpha1.Function { + function := makeFunctionSample(TestFunctionName) + function.Spec.Output = v1alpha1.OutputConf{ + Topic: "persistent://public/default/java-function-output-topic", + ProducerConf: &v1alpha1.ProducerConfig{ + BatchBuilder: "KEY_BASED", + }, + } + return function +} + func makeFunctionMeshSample() *v1alpha1.FunctionMesh { inputTopic := "persistent://public/default/functionmesh-input-topic" middleTopic := "persistent://public/default/mid-topic" diff --git a/manifests/crd.yaml b/manifests/crd.yaml index f45053934..2261b4715 100644 --- a/manifests/crd.yaml +++ b/manifests/crd.yaml @@ -170,6 +170,8 @@ spec: type: object producerConf: properties: + batchBuilder: + type: string cryptoConfig: properties: consumerCryptoFailureAction: @@ -9355,6 +9357,8 @@ spec: type: object producerConf: properties: + batchBuilder: + type: string cryptoConfig: properties: consumerCryptoFailureAction: @@ -13944,9 +13948,9 @@ metadata: annotations: controller-gen.kubebuilder.io/version: v0.2.4 creationTimestamp: null - name: functions.cloud.streamnative.io + name: functions.compute.functionmesh.io spec: - group: cloud.streamnative.io + group: compute.functionmesh.io names: kind: Function listKind: FunctionList @@ -14109,6 +14113,8 @@ spec: type: object producerConf: properties: + batchBuilder: + type: string cryptoConfig: properties: consumerCryptoFailureAction: @@ -14757,137 +14763,6 @@ spec: type: object type: array type: object - type: array - type: object - type: array - type: object - status: - description: FunctionMeshStatus defines the observed state of FunctionMesh - properties: - functionConditions: - additionalProperties: - description: The `Status` of a given `Condition` and the `Action` - needed to reach the `Status` - properties: - action: - type: string - condition: - type: string - status: - type: string - type: object - type: object - sinkConditions: - additionalProperties: - description: The `Status` of a given `Condition` and the `Action` - needed to reach the `Status` - properties: - action: - type: string - condition: - type: string - status: - type: string - type: object - type: object - sourceConditions: - additionalProperties: - description: The `Status` of a given `Condition` and the `Action` - needed to reach the `Status` - properties: - action: - type: string - condition: - type: string - status: - type: string - type: object - description: 'INSERT ADDITIONAL STATUS FIELD - define observed state - of cluster Important: Run "make" to regenerate code after modifying - this file' - type: object - type: object - type: object - version: v1alpha1 - versions: - - name: v1alpha1 - served: true - storage: true -status: - acceptedNames: - kind: "" - plural: "" - conditions: [] - storedVersions: [] ---- -apiVersion: apiextensions.k8s.io/v1beta1 -kind: CustomResourceDefinition -metadata: - annotations: - controller-gen.kubebuilder.io/version: v0.3.0 - creationTimestamp: null - name: functions.compute.functionmesh.io -spec: - group: compute.functionmesh.io - names: - kind: Function - listKind: FunctionList - plural: functions - singular: function - scope: Namespaced - subresources: - scale: - labelSelectorPath: .status.selector - specReplicasPath: .spec.replicas - statusReplicasPath: .status.replicas - status: {} - validation: - openAPIV3Schema: - description: Function is the Schema for the functions API - properties: - apiVersion: - description: 'APIVersion defines the versioned schema of this representation - of an object. Servers should convert recognized schemas to the latest - internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' - type: string - kind: - description: 'Kind is a string value representing the REST resource this - object represents. Servers may infer this from the endpoint the client - submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' - type: string - metadata: - type: object - spec: - description: FunctionSpec defines the desired state of Function - properties: - autoAck: - type: boolean - className: - type: string - cleanupSubscription: - type: boolean - clusterName: - type: string - deadLetterTopic: - type: string - forwardSourceMessageProperty: - type: boolean - funcConfig: - additionalProperties: - type: string - type: object - golang: - properties: - go: - type: string - goLocation: - type: string - type: object - input: - properties: - customSchemaSources: - additionalProperties: - type: string type: object annotations: additionalProperties: @@ -18572,9 +18447,9 @@ metadata: annotations: controller-gen.kubebuilder.io/version: v0.2.4 creationTimestamp: null - name: sinks.cloud.streamnative.io + name: sinks.compute.functionmesh.io spec: - group: cloud.streamnative.io + group: compute.functionmesh.io names: kind: Sink listKind: SinkList @@ -19324,267 +19199,6 @@ spec: type: object type: array type: object - volumeName: - description: VolumeName is the human-readable name of - the StorageOS volume. Volume names are only unique - within a namespace. - type: string - volumeNamespace: - description: VolumeNamespace specifies the scope of the - volume within StorageOS. If no namespace is specified - then the Pod's namespace will be used. This allows - the Kubernetes name scoping to be mirrored within StorageOS - for tighter integration. Set VolumeName to any name - to override the default behaviour. Set to "default" - if you are not using namespaces within StorageOS. Namespaces - that do not pre-exist within StorageOS will be created. - type: string - type: object - vsphereVolume: - description: VsphereVolume represents a vSphere volume attached - and mounted on kubelets host machine - properties: - fsType: - description: Filesystem type to mount. Must be a filesystem - type supported by the host operating system. Ex. "ext4", - "xfs", "ntfs". Implicitly inferred to be "ext4" if unspecified. - type: string - storagePolicyID: - description: Storage Policy Based Management (SPBM) profile - ID associated with the StoragePolicyName. - type: string - storagePolicyName: - description: Storage Policy Based Management (SPBM) profile - name. - type: string - volumePath: - description: Path that identifies vSphere volume vmdk - type: string - required: - - volumePath - type: object - required: - - name - type: object - type: array - type: object - processingGuarantee: - type: string - pulsar: - properties: - authConfig: - type: string - pulsarConfig: - description: The config map need to contain the following fields - webServiceURL brokerServiceURL - type: string - type: object - python: - properties: - py: - type: string - pyLocation: - type: string - type: object - replicas: - format: int32 - type: integer - resources: - description: ResourceRequirements describes the compute resource requirements. - properties: - limits: - additionalProperties: - anyOf: - - type: integer - - type: string - pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ - x-kubernetes-int-or-string: true - description: 'Limits describes the maximum amount of compute resources - allowed. More info: https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/' - type: object - requests: - additionalProperties: - anyOf: - - type: integer - - type: string - pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ - x-kubernetes-int-or-string: true - description: 'Requests describes the minimum amount of compute resources - required. If Requests is omitted for a container, it defaults - to Limits if that is explicitly specified, otherwise to an implementation-defined - value. More info: https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/' - type: object - type: object - retainKeyOrdering: - type: boolean - retainOrdering: - type: boolean - runtimeFlags: - type: string - secretsMap: - additionalProperties: - properties: - key: - type: string - path: - type: string - type: object - type: object - sinkType: - type: string - sourceType: - type: string - subscriptionName: - type: string - subscriptionPosition: - type: string - tenant: - type: string - timeout: - format: int32 - type: integer - volumeMounts: - items: - description: VolumeMount describes a mounting of a Volume within a - container. - properties: - mountPath: - description: Path within the container at which the volume should - be mounted. Must not contain ':'. - type: string - mountPropagation: - description: mountPropagation determines how mounts are propagated - from the host to container and the other way around. When not - set, MountPropagationNone is used. This field is beta in 1.10. - type: string - name: - description: This must match the Name of a Volume. - type: string - readOnly: - description: Mounted read-only if true, read-write otherwise (false - or unspecified). Defaults to false. - type: boolean - subPath: - description: Path within the volume from which the container's - volume should be mounted. Defaults to "" (volume's root). - type: string - subPathExpr: - description: Expanded path within the volume from which the container's - volume should be mounted. Behaves similarly to SubPath but environment - variable references $(VAR_NAME) are expanded using the container's - environment. Defaults to "" (volume's root). SubPathExpr and - SubPath are mutually exclusive. - type: string - required: - - mountPath - - name - type: object - type: array - type: object - status: - description: FunctionStatus defines the observed state of Function - properties: - conditions: - additionalProperties: - description: The `Status` of a given `Condition` and the `Action` - needed to reach the `Status` - properties: - action: - type: string - condition: - type: string - status: - type: string - type: object - description: 'INSERT ADDITIONAL STATUS FIELD - define observed state - of cluster Important: Run "make" to regenerate code after modifying - this file' - type: object - replicas: - format: int32 - type: integer - selector: - type: string - required: - - conditions - - replicas - - selector - type: object - type: object - version: v1alpha1 - versions: - - name: v1alpha1 - served: true - storage: true -status: - acceptedNames: - kind: "" - plural: "" - conditions: [] - storedVersions: [] ---- -apiVersion: apiextensions.k8s.io/v1beta1 -kind: CustomResourceDefinition -metadata: - annotations: - controller-gen.kubebuilder.io/version: v0.3.0 - creationTimestamp: null - name: sinks.compute.functionmesh.io -spec: - group: compute.functionmesh.io - names: - kind: Sink - listKind: SinkList - plural: sinks - singular: sink - scope: Namespaced - subresources: - scale: - labelSelectorPath: .status.selector - specReplicasPath: .spec.replicas - statusReplicasPath: .status.replicas - status: {} - validation: - openAPIV3Schema: - description: Topic is the Schema for the sinks API - properties: - apiVersion: - description: 'APIVersion defines the versioned schema of this representation - of an object. Servers should convert recognized schemas to the latest - internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' - type: string - kind: - description: 'Kind is a string value representing the REST resource this - object represents. Servers may infer this from the endpoint the client - submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' - type: string - metadata: - type: object - spec: - description: SinkSpec defines the desired state of Topic - properties: - autoAck: - type: boolean - className: - type: string - cleanupSubscription: - type: boolean - clusterName: - type: string - deadLetterTopic: - type: string - golang: - properties: - go: - type: string - goLocation: - type: string - type: object - input: - properties: - customSchemaSources: - additionalProperties: - type: string type: object annotations: additionalProperties: @@ -23348,6 +22962,8 @@ spec: type: object producerConf: properties: + batchBuilder: + type: string cryptoConfig: properties: consumerCryptoFailureAction: