Skip to content

Commit

Permalink
[#45] add KeyBased Batcher for function mesh (#122)
Browse files Browse the repository at this point in the history
* add BatchBuilder

* yaml update

* add tests
  • Loading branch information
freeznet authored Apr 20, 2021
1 parent 50b3aef commit d7deb90
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 396 deletions.
56 changes: 56 additions & 0 deletions .github/workflows/test-function-key-based-batcher.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
name: Precommit - Test Function Kind with Key Based Batcher
on:
pull_request:
branches:
- '*'
jobs:
lint-test:
runs-on: ubuntu-latest
steps:
- name: clean disk
run: |
sudo swapoff -a
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
- name: Checkout
uses: actions/checkout@v2
with:
fetch-depth: 0
ref: ${{ github.event.pull_request.head.sha }}

- name: Deploy k8s cluster env
run: |
.ci/deploy_pulsar_cluster.sh
- name: Build runner images
run: |
PULSAR_IMAGE_TAG=2.7.0 PULSAR_IMAGE=apachepulsar/pulsar-all KIND_PUSH=true images/build.sh
- name: Install operator-sdk
run: |
RELEASE_VERSION=v1.2.0
curl -LO https://github.com/operator-framework/operator-sdk/releases/download/${RELEASE_VERSION}/operator-sdk-${RELEASE_VERSION}-x86_64-linux-gnu
chmod +x operator-sdk-${RELEASE_VERSION}-x86_64-linux-gnu && sudo mkdir -p /usr/local/bin/ && sudo cp operator-sdk-${RELEASE_VERSION}-x86_64-linux-gnu /usr/local/bin/operator-sdk && rm operator-sdk-${RELEASE_VERSION}-x86_64-linux-gnu
# - name: Add CRD, controller or webhooks
# run: |
# operator-sdk create api --group cloud --version v1alpha1 --kind Function --resource=true --controller=true
# operator-sdk create webhook --group compute.functionmesh.io --version v1alpha1 --kind Function --defaulting --programmatic-validation

- name: Deploy function mesh server
run: |
make generate
make install
nohup make run &
- name: Test Function kind
run: |
kubectl apply -f config/samples/compute_v1alpha1_function_key_based_batcher.yaml
kubectl get all
- name: Verfy Function Mesh
run: |
.ci/verify_function_mesh.sh java-function-batcher-sample
1 change: 1 addition & 0 deletions api/v1alpha1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ type ProducerConfig struct {
MaxPendingMessagesAcrossPartitions int32 `json:"maxPendingMessagesAcrossPartitions,omitempty"`
UseThreadLocalProducers bool `json:"useThreadLocalProducers,omitempty"`
CryptoConfig *CryptoConfig `json:"cryptoConfig,omitempty"`
BatchBuilder string `json:"batchBuilder,omitempty"`
}

type CryptoConfig struct {
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/compute.functionmesh.io_functionmeshes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ spec:
type: object
producerConf:
properties:
batchBuilder:
type: string
cryptoConfig:
properties:
consumerCryptoFailureAction:
Expand Down Expand Up @@ -5865,6 +5867,8 @@ spec:
type: object
producerConf:
properties:
batchBuilder:
type: string
cryptoConfig:
properties:
consumerCryptoFailureAction:
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/compute.functionmesh.io_functions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ spec:
type: object
producerConf:
properties:
batchBuilder:
type: string
cryptoConfig:
properties:
consumerCryptoFailureAction:
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/compute.functionmesh.io_sources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ spec:
type: object
producerConf:
properties:
batchBuilder:
type: string
cryptoConfig:
properties:
consumerCryptoFailureAction:
Expand Down
98 changes: 98 additions & 0 deletions config/samples/compute_v1alpha1_function_key_based_batcher.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
apiVersion: compute.functionmesh.io/v1alpha1
kind: Function
metadata:
name: java-function-batcher-sample
namespace: default
spec:
className: org.apache.pulsar.functions.api.examples.ExclamationFunction
image: streamnative/pulsar-all:2.7.1
sourceType: java.lang.String
sinkType: java.lang.String
forwardSourceMessageProperty: true
MaxPendingAsyncRequests: 1000
replicas: 1
maxReplicas: 5
logTopic: persistent://public/default/logging-function-logs
input:
topics:
- persistent://public/default/java-function-input-topic
output:
topic: persistent://public/default/java-function-output-topic
producerConf:
batchBuilder: "KEY_BASED"
resources:
requests:
cpu: "0.1"
memory: 1G
limits:
cpu: "0.2"
memory: 1.1G
# each secret will be loaded ad an env variable from the `path` secret with the `key` in that secret in the name of `name`
secretsMap:
"name":
path: "test-secret"
key: "username"
"pwd":
path: "test-secret"
key: "password"
pulsar:
pulsarConfig: "test-pulsar"
#authConfig: "test-auth"
volumeMounts:
- mountPath: /cache
name: cache-volume
pod:
labels:
"locaction": "mtv"
annotations:
"managed-function": "true"
volumes:
- name: cache-volume
emptyDir: {}
imagePullSecrets:
- name: regcred
initContainers:
- name: init-function
image: busybox:1.28
command: ['sh', '-c', 'echo The app is running! && sleep 30']
sidecars:
- name: sidecar-function
image: busybox:1.28
command: ['sh', '-c', 'echo The app is running! && sleep 30000']
java:
jar: /pulsar/examples/api-examples.jar
jarLocation: ""
# use package name:
# jarLocation: function://public/default/nul-test-java-function@v1
# to be delete & use admission hook
clusterName: test-pulsar
autoAck: true
---
apiVersion: v1
kind: ConfigMap
metadata:
name: test-pulsar
data:
webServiceURL: http://test-pulsar-broker.default.svc.cluster.local:8080
brokerServiceURL: pulsar://test-pulsar-broker.default.svc.cluster.local:6650
#---
#apiVersion: v1
#kind: ConfigMap
#metadata:
# name: test-auth
#data:
# clientAuthenticationPlugin: "abc"
# clientAuthenticationParameters: "xyz"
# tlsTrustCertsFilePath: "uvw"
# useTls: "true"
# tlsAllowInsecureConnection: "false"
# tlsHostnameVerificationEnable: "true"
---
apiVersion: v1
data:
username: YWRtaW4=
password: MWYyZDFlMmU2N2Rm
kind: Secret
metadata:
name: test-secret
type: Opaque
12 changes: 12 additions & 0 deletions controllers/function_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ var _ = Describe("Function Controller (E2E)", func() {
})
})

var _ = Describe("Function Controller (Batcher)", func() {
Context("Function With Batcher Item", func() {
configs := makeSamplePulsarConfig()
function := makeFunctionSampleWithKeyBasedBatcher()

createFunctionConfigMap(configs)
createFunction(function)
deleteFunction(function)
deleteFunctionConfigMap(configs)
})
})

func createFunction(function *v1alpha1.Function) {
if function.Status.Conditions == nil {
function.Status.Conditions = make(map[v1alpha1.Component]v1alpha1.ResourceCondition)
Expand Down
2 changes: 2 additions & 0 deletions controllers/spec/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func generateFunctionOutputSpec(function *v1alpha1.Function) *proto.SinkSpec {
MaxPendingMessagesAcrossPartitions: function.Spec.Output.ProducerConf.MaxPendingMessagesAcrossPartitions,
UseThreadLocalProducers: function.Spec.Output.ProducerConf.UseThreadLocalProducers,
CryptoSpec: generateCryptoSpec(function.Spec.Output.ProducerConf.CryptoConfig),
BatchBuilder: function.Spec.Output.ProducerConf.BatchBuilder,
}

sinkSpec.ProducerSpec = producerConfig
Expand Down Expand Up @@ -201,6 +202,7 @@ func generateSourceOutputSpec(source *v1alpha1.Source) *proto.SinkSpec {
MaxPendingMessagesAcrossPartitions: source.Spec.Output.ProducerConf.MaxPendingMessagesAcrossPartitions,
UseThreadLocalProducers: source.Spec.Output.ProducerConf.UseThreadLocalProducers,
CryptoSpec: cryptoSpec,
BatchBuilder: source.Spec.Output.ProducerConf.BatchBuilder,
}
}
return &proto.SinkSpec{
Expand Down
11 changes: 11 additions & 0 deletions controllers/test_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,17 @@ func makeFunctionSampleWithCryptoEnabled() *v1alpha1.Function {
return function
}

func makeFunctionSampleWithKeyBasedBatcher() *v1alpha1.Function {
function := makeFunctionSample(TestFunctionName)
function.Spec.Output = v1alpha1.OutputConf{
Topic: "persistent://public/default/java-function-output-topic",
ProducerConf: &v1alpha1.ProducerConfig{
BatchBuilder: "KEY_BASED",
},
}
return function
}

func makeFunctionMeshSample() *v1alpha1.FunctionMesh {
inputTopic := "persistent://public/default/functionmesh-input-topic"
middleTopic := "persistent://public/default/mid-topic"
Expand Down
Loading

0 comments on commit d7deb90

Please sign in to comment.