Skip to content

Commit

Permalink
enable source connector to set ForwardMessageProperty (#250)
Browse files Browse the repository at this point in the history
* enable source connector to set ForwardMessageProperty

* update mesh worker service

* handle nil
  • Loading branch information
nlu90 authored Aug 11, 2021
1 parent feff2d8 commit 2f2a69f
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 13 deletions.
16 changes: 8 additions & 8 deletions api/v1alpha1/source_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ type SourceSpec struct {

// +kubebuilder:validation:Optional
// +kubebuilder:pruning:PreserveUnknownFields
SourceConfig *Config `json:"sourceConfig,omitempty"`
Resources corev1.ResourceRequirements `json:"resources,omitempty"`
SecretsMap map[string]SecretRef `json:"secretsMap,omitempty"`
ProcessingGuarantee ProcessGuarantee `json:"processingGuarantee,omitempty"`
RuntimeFlags string `json:"runtimeFlags,omitempty"`
VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"`

Pod PodPolicy `json:"pod,omitempty"`
SourceConfig *Config `json:"sourceConfig,omitempty"`
Resources corev1.ResourceRequirements `json:"resources,omitempty"`
SecretsMap map[string]SecretRef `json:"secretsMap,omitempty"`
ProcessingGuarantee ProcessGuarantee `json:"processingGuarantee,omitempty"`
RuntimeFlags string `json:"runtimeFlags,omitempty"`
VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"`
ForwardSourceMessageProperty *bool `json:"forwardSourceMessageProperty,omitempty"`
Pod PodPolicy `json:"pod,omitempty"`

Messaging `json:",inline"`
Runtime `json:",inline"`
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/source_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ func (r *Source) Default() {
}
}

if r.Spec.ForwardSourceMessageProperty == nil {
trueVal := true
r.Spec.ForwardSourceMessageProperty = &trueVal
}

if r.Spec.Output.ProducerConf == nil {
producerConf := &ProducerConfig{
MaxPendingMessages: 1000,
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions config/crd/bases/compute.functionmesh.io_functionmeshes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4450,6 +4450,8 @@ spec:
type: string
clusterName:
type: string
forwardSourceMessageProperty:
type: boolean
golang:
properties:
go:
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/compute.functionmesh.io_sources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ spec:
type: string
clusterName:
type: string
forwardSourceMessageProperty:
type: boolean
golang:
properties:
go:
Expand Down
1 change: 1 addition & 0 deletions config/samples/compute_v1alpha1_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ spec:
useThreadLocalProducers: true
topic: persistent://public/default/destination
typeClassName: org.apache.pulsar.common.schema.KeyValue
forwardSourceMessageProperty: true
resources:
limits:
cpu: "0.2"
Expand Down
15 changes: 10 additions & 5 deletions controllers/spec/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,17 @@ func generateSourceOutputSpec(source *v1alpha1.Source) *proto.SinkSpec {
BatchBuilder: source.Spec.Output.ProducerConf.BatchBuilder,
}
}
var forward = false
if source.Spec.ForwardSourceMessageProperty != nil {
forward = *source.Spec.ForwardSourceMessageProperty
}
return &proto.SinkSpec{
TypeClassName: source.Spec.Output.TypeClassName,
Topic: source.Spec.Output.Topic,
ProducerSpec: &producerSpec,
SerDeClassName: source.Spec.Output.SinkSerdeClassName,
SchemaType: source.Spec.Output.SinkSchemaType,
TypeClassName: source.Spec.Output.TypeClassName,
Topic: source.Spec.Output.Topic,
ProducerSpec: &producerSpec,
SerDeClassName: source.Spec.Output.SinkSerdeClassName,
SchemaType: source.Spec.Output.SinkSchemaType,
ForwardSourceMessageProperty: forward,
}
}

Expand Down
4 changes: 4 additions & 0 deletions manifests/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4448,6 +4448,8 @@ spec:
type: string
clusterName:
type: string
forwardSourceMessageProperty:
type: boolean
golang:
properties:
go:
Expand Down Expand Up @@ -11212,6 +11214,8 @@ spec:
type: string
clusterName:
type: string
forwardSourceMessageProperty:
type: boolean
golang:
properties:
go:
Expand Down
19 changes: 19 additions & 0 deletions mesh-worker-service/integration-tests/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

FROM streamnative/pulsar-all:2.8.0.7
COPY ./target/mesh-worker-service*.nar /pulsar/mesh-worker-service.nar
COPY ./integration-tests/docker/connectors.yaml /pulsar/conf/connectors.yaml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# specific language governing permissions and limitations
# under the License.
#

- id: pulsar-io-data-generator
name: data-generator
description: Test data generator connector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public static V1alpha1Source createV1alpha1SourceFromSourceConfig(String kind, S
if (Strings.isNotEmpty(functionDetails.getSink().getSchemaType())) {
v1alpha1SourceSpecOutput.setSinkSchemaType(functionDetails.getSink().getSchemaType());
}
v1alpha1SourceSpec.setForwardSourceMessageProperty(functionDetails.getSink().getForwardSourceMessageProperty());
// process ProducerConf
V1alpha1SourceSpecOutputProducerConf v1alpha1SourceSpecOutputProducerConf
= new V1alpha1SourceSpecOutputProducerConf();
Expand Down

0 comments on commit 2f2a69f

Please sign in to comment.