From 23362787aa3638cce36b3e25d2739d4e6fee27b1 Mon Sep 17 00:00:00 2001 From: Nacho Munoz Date: Fri, 29 Apr 2022 23:04:54 +0100 Subject: [PATCH] create worker task working --- Makefile | 6 +- apis/sample/v1alpha1/mytype_types.go | 26 +++- apis/sample/v1alpha1/zz_generated.deepcopy.go | 30 +++++ apis/v1alpha1/zz_generated.deepcopy.go | 1 + examples/provider/config.yaml | 6 +- examples/sample/mytype.yaml | 2 +- go.mod | 3 + go.sum | 8 +- internal/controller/mytype/mytype.go | 67 +++++++++- internal/controller/mytype/mytype_test.go | 122 +++++++++++++++++- ...sample.template.crossplane.io_mytypes.yaml | 35 +++++ 11 files changed, 286 insertions(+), 20 deletions(-) diff --git a/Makefile b/Makefile index fa237c9..45e2369 100644 --- a/Makefile +++ b/Makefile @@ -70,9 +70,9 @@ run: go.build $(GO_OUT_DIR)/$(PROJECT_NAME) --debug dev: $(KIND) $(KUBECTL) - @$(INFO) Creating kind cluster - @$(KIND) create cluster --name=$(PROJECT_NAME)-dev - @$(KUBECTL) cluster-info --context kind-$(PROJECT_NAME)-dev +# @$(INFO) Creating kind cluster +# @$(KIND) create cluster --name=$(PROJECT_NAME)-dev +# @$(KUBECTL) cluster-info --context kind-$(PROJECT_NAME)-dev @$(INFO) Installing Crossplane CRDs @$(KUBECTL) apply -k https://github.com/crossplane/crossplane//cluster?ref=master @$(INFO) Installing Provider SQL CRDs diff --git a/apis/sample/v1alpha1/mytype_types.go b/apis/sample/v1alpha1/mytype_types.go index d83da9d..714e6b7 100644 --- a/apis/sample/v1alpha1/mytype_types.go +++ b/apis/sample/v1alpha1/mytype_types.go @@ -35,16 +35,38 @@ type MyTypeObservation struct { ObservableField string `json:"observableField,omitempty"` } +// KafkaTopics are part of the desired state fields +type KafkaTopics struct { + NumPartitions int16 `json:"numPartitions,omitempty"` + ReplicationFactor int8 `json:"replicationFactor,omitempty"` +} + // A MyTypeSpec defines the desired state of a MyType. type MyTypeSpec struct { - xpv1.ResourceSpec `json:",inline"` - ForProvider MyTypeParameters `json:"forProvider"` + xpv1.ResourceSpec `json:",inline"` + Class string `json:"class,omitempty"` + DurationMs int64 `json:"durationMs,omitempty"` + ProducerNode string `json:"producerNode,omitempty"` + ClientNode string `json:"clientNode,omitempty"` + BootstrapServers string `json:"bootstrapServers,omitempty"` + TargetMessagesPerSec int32 `json:"targetMessagesPerSec,omitempty"` + MaxMessages int64 `json:"maxMessages,omitempty"` + ActiveTopics map[string]KafkaTopics `json:"activeTopics,omitempty"` + InactiveTopics map[string]KafkaTopics `json:"inactiveTopics,omitempty"` + ProducerConf map[string]string `json:"producerConf,omitempty"` + ConsumerConf map[string]string `json:"consumerConf,omitempty"` + CommonClientConf map[string]string `json:"commonClientConf,omitempty"` + AdminClientConf map[string]string `json:"adminClientConf,omitempty"` + ForProvider MyTypeParameters `json:"forProvider"` } // A MyTypeStatus represents the observed state of a MyType. type MyTypeStatus struct { xpv1.ResourceStatus `json:",inline"` AtProvider MyTypeObservation `json:"atProvider,omitempty"` + TaskStatus string `json:"taskStatus,omitempty"` + TaskId string `json:"taskId,omitempty"` + WorkerId uint64 `json:"workerId,omitempty"` } // +kubebuilder:object:root=true diff --git a/apis/sample/v1alpha1/zz_generated.deepcopy.go b/apis/sample/v1alpha1/zz_generated.deepcopy.go index 37bc7d1..abffaff 100644 --- a/apis/sample/v1alpha1/zz_generated.deepcopy.go +++ b/apis/sample/v1alpha1/zz_generated.deepcopy.go @@ -1,3 +1,4 @@ +//go:build !ignore_autogenerated // +build !ignore_autogenerated /* @@ -24,6 +25,21 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *KafkaTopics) DeepCopyInto(out *KafkaTopics) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KafkaTopics. +func (in *KafkaTopics) DeepCopy() *KafkaTopics { + if in == nil { + return nil + } + out := new(KafkaTopics) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *MyType) DeepCopyInto(out *MyType) { *out = *in @@ -117,6 +133,20 @@ func (in *MyTypeParameters) DeepCopy() *MyTypeParameters { func (in *MyTypeSpec) DeepCopyInto(out *MyTypeSpec) { *out = *in in.ResourceSpec.DeepCopyInto(&out.ResourceSpec) + if in.ActiveTopics != nil { + in, out := &in.ActiveTopics, &out.ActiveTopics + *out = make(map[string]KafkaTopics, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.InactiveTopics != nil { + in, out := &in.InactiveTopics, &out.InactiveTopics + *out = make(map[string]KafkaTopics, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } out.ForProvider = in.ForProvider } diff --git a/apis/v1alpha1/zz_generated.deepcopy.go b/apis/v1alpha1/zz_generated.deepcopy.go index 9bc0ec6..458ab6d 100644 --- a/apis/v1alpha1/zz_generated.deepcopy.go +++ b/apis/v1alpha1/zz_generated.deepcopy.go @@ -1,3 +1,4 @@ +//go:build !ignore_autogenerated // +build !ignore_autogenerated /* diff --git a/examples/provider/config.yaml b/examples/provider/config.yaml index 0dcfb2b..fcb6b45 100644 --- a/examples/provider/config.yaml +++ b/examples/provider/config.yaml @@ -1,11 +1,10 @@ apiVersion: v1 kind: Secret metadata: - namespace: crossplane-system name: example-provider-secret type: Opaque data: - # credentials: BASE64ENCODED_PROVIDER_CREDS + credentials: dGVzdAo= --- apiVersion: template.crossplane.io/v1alpha1 kind: ProviderConfig @@ -15,6 +14,7 @@ spec: credentials: source: Secret secretRef: - namespace: crossplane-system + namespace: tarasque name: example-provider-secret key: credentials +--- diff --git a/examples/sample/mytype.yaml b/examples/sample/mytype.yaml index a73ff5c..f2dfeee 100644 --- a/examples/sample/mytype.yaml +++ b/examples/sample/mytype.yaml @@ -6,4 +6,4 @@ spec: forProvider: configurableField: test providerConfigRef: - name: example \ No newline at end of file + name: example diff --git a/go.mod b/go.mod index e72b5dd..36e5e9e 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,10 @@ go 1.16 require ( github.com/crossplane/crossplane-runtime v0.15.0 github.com/crossplane/crossplane-tools v0.0.0-20210320162312-1baca298c527 + github.com/go-resty/resty/v2 v2.7.0 github.com/google/go-cmp v0.5.6 + github.com/google/uuid v1.1.2 + github.com/jarcoal/httpmock v1.1.0 github.com/pkg/errors v0.9.1 gopkg.in/alecthomas/kingpin.v2 v2.2.6 k8s.io/apimachinery v0.21.3 diff --git a/go.sum b/go.sum index cd3c53b..5c5c243 100644 --- a/go.sum +++ b/go.sum @@ -222,6 +222,8 @@ github.com/go-openapi/swag v0.19.2/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= github.com/go-openapi/validate v0.18.0/go.mod h1:Uh4HdOzKt19xGIGm1qHf/ofbX1YQ4Y+MYsct2VUrAJ4= github.com/go-openapi/validate v0.19.2/go.mod h1:1tRCw7m3jtI8eNWEEliiAqUIcBztB2KDnRCRMUi7GTA= +github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY= +github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/gobuffalo/flect v0.1.5/go.mod h1:W3K3X9ksuZfir8f/LrfVtWmCDQFfayuylOJ7sz/Fj80= @@ -360,6 +362,8 @@ github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU= github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/jarcoal/httpmock v1.1.0 h1:F47ChZj1Y2zFsCXxNkBPwNNKnAyOATcdQibk0qEdVCE= +github.com/jarcoal/httpmock v1.1.0/go.mod h1:ATjnClrvW/3tijVmpL/va5Z3aAyGvqU3gCT8nX0Txik= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= @@ -700,8 +704,9 @@ golang.org/x/net v0.0.0-20210224082022-3d97a244fca7/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= +golang.org/x/net v0.0.0-20211029224645-99673261e6eb h1:pirldcYWx7rx7kE5r+9WsOXPXK0+WH5+uZ7uPmJ44uM= +golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1106,7 +1111,6 @@ sigs.k8s.io/structured-merge-diff v0.0.0-20190525122527-15d366b2352e/go.mod h1:w sigs.k8s.io/structured-merge-diff v0.0.0-20190817042607-6149e4549fca h1:6dsH6AYQWbyZmtttJNe8Gq1cXOeS1BdV3eW37zHilAQ= sigs.k8s.io/structured-merge-diff v0.0.0-20190817042607-6149e4549fca/go.mod h1:IIgPezJWb76P0hotTxzDbWsMYB8APh18qZnxkomBpxA= sigs.k8s.io/structured-merge-diff/v4 v4.0.2/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw= -sigs.k8s.io/structured-merge-diff/v4 v4.0.2/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw= sigs.k8s.io/structured-merge-diff/v4 v4.1.0/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw= sigs.k8s.io/structured-merge-diff/v4 v4.1.2 h1:Hr/htKFmJEbtMgS/UD0N+gtgctAqz81t3nu+sPzynno= sigs.k8s.io/structured-merge-diff/v4 v4.1.2/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4= diff --git a/internal/controller/mytype/mytype.go b/internal/controller/mytype/mytype.go index db8fe22..aeee37c 100644 --- a/internal/controller/mytype/mytype.go +++ b/internal/controller/mytype/mytype.go @@ -18,8 +18,15 @@ package mytype import ( "context" + "encoding/json" "fmt" + "math/rand" + "net/http" + "os" + "reflect" + "time" + "github.com/google/uuid" "github.com/pkg/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" @@ -35,6 +42,7 @@ import ( "github.com/crossplane/provider-template/apis/sample/v1alpha1" apisv1alpha1 "github.com/crossplane/provider-template/apis/v1alpha1" + resty "github.com/go-resty/resty/v2" ) const ( @@ -43,14 +51,24 @@ const ( errGetPC = "cannot get ProviderConfig" errGetCreds = "cannot get credentials" - errNewClient = "cannot create new Service" + errNewClient = "cannot create new Service" + errNewTask = "cannot create new Task" + defaultAgentServiceUrl = "https://tarasque-agent.tarasque.svc.cluster.local" ) +func getEnvOrDefault(key, fallback string) string { + if value, ok := os.LookupEnv(key); ok { + return value + } + return fallback +} + // A NoOpService does nothing. type NoOpService struct{} var ( - newNoOpService = func(_ []byte) (interface{}, error) { return &NoOpService{}, nil } + newNoOpService = func(_ []byte) (*resty.Client, error) { return resty.New(), nil } + agentServiceUrl = getEnvOrDefault("SERVICE_URL", defaultAgentServiceUrl) ) // Setup adds a controller that reconciles MyType managed resources. @@ -77,12 +95,24 @@ func Setup(mgr ctrl.Manager, l logging.Logger, rl workqueue.RateLimiter) error { Complete(r) } +type WorkerTaskSpec struct { + v1alpha1.MyTypeSpec + StartMs int64 `json:"startMs,omitempty"` +} + +// KafkaTopics are part of the desired state fields +type WorkerTask struct { + TaskId string `json:"taskId,omitempty"` + WorkerId int64 `json:"workerId,omitempty"` + Spec WorkerTaskSpec `json:"spec,omitempty"` +} + // A connector is expected to produce an ExternalClient when its Connect method // is called. type connector struct { kube client.Client usage resource.Tracker - newServiceFn func(creds []byte) (interface{}, error) + newServiceFn func(creds []byte) (*resty.Client, error) } // Connect typically produces an ExternalClient by: @@ -124,7 +154,7 @@ func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.E type external struct { // A 'client' used to connect to the external resource API. In practice this // would be something like an AWS SDK client. - service interface{} + service *resty.Client } func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.ExternalObservation, error) { @@ -134,13 +164,13 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex } // These fmt statements should be removed in the real implementation. - fmt.Printf("Observing: %+v", cr) + fmt.Printf("Observing: %+v \n", cr) return managed.ExternalObservation{ // Return false when the external resource does not exist. This lets // the managed resource reconciler know that it needs to call Create to // (re)create the resource, or that it has successfully been deleted. - ResourceExists: true, + ResourceExists: cr.Status.TaskId != "", // Return false when the external resource exists, but it not up to date // with the desired managed resource state. This lets the managed @@ -159,7 +189,30 @@ func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.Ext return managed.ExternalCreation{}, errors.New(errNotMyType) } - fmt.Printf("Creating: %+v", cr) + payload := WorkerTask{Spec: WorkerTaskSpec{cr.Spec, time.Now().UnixMilli()}, WorkerId: rand.Int63(), TaskId: uuid.New().String()} + body, err := json.Marshal(payload) + if err != nil { + return managed.ExternalCreation{}, err + } + + mBody := make(map[string]interface{}, reflect.ValueOf(payload).NumField()) + if err := json.Unmarshal(body, &mBody); err != nil { + return managed.ExternalCreation{}, err + } + spec := mBody["spec"].(map[string]interface{}) + delete(spec, "providerConfigRef") + delete(spec, "forProvider") + delete(spec, "deletionPolicy") + fmt.Printf("Creating: %+v \n", string(body)) + resp, err := c.service.NewRequest(). + SetHeader("Accept", "application/json"). + SetHeader("Content-Type", "application/json"). + SetBody(mBody).Post(agentServiceUrl + "/agent/worker/create") + + fmt.Printf("Response: %v \n", string(resp.Body())) + if resp.StatusCode() != http.StatusOK || err != nil { + return managed.ExternalCreation{}, err + } return managed.ExternalCreation{ // Optionally return any details that may be required to connect to the diff --git a/internal/controller/mytype/mytype_test.go b/internal/controller/mytype/mytype_test.go index c99ffb6..3c683ca 100644 --- a/internal/controller/mytype/mytype_test.go +++ b/internal/controller/mytype/mytype_test.go @@ -18,13 +18,18 @@ package mytype import ( "context" + "net/http" "testing" + resty "github.com/go-resty/resty/v2" "github.com/google/go-cmp/cmp" "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" "github.com/crossplane/crossplane-runtime/pkg/resource" "github.com/crossplane/crossplane-runtime/pkg/test" + "github.com/crossplane/provider-template/apis/sample/v1alpha1" + "github.com/jarcoal/httpmock" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // Unlike many Kubernetes projects Crossplane does not use third party testing @@ -37,7 +42,7 @@ import ( func TestObserve(t *testing.T) { type fields struct { - service interface{} + service *resty.Client } type args struct { @@ -49,6 +54,7 @@ func TestObserve(t *testing.T) { o managed.ExternalObservation err error } + client := resty.New() cases := map[string]struct { reason string @@ -56,7 +62,40 @@ func TestObserve(t *testing.T) { args args want want }{ - // TODO: Add test cases. + "test": { + "test", + fields{service: client}, + args{ + context.TODO(), + &v1alpha1.MyType{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + Name: "newBenchmark", + }, + Spec: v1alpha1.MyTypeSpec{ + Class: "org.apache.kafka.trogdor.workload.ProduceBenchSpec", + BootstrapServers: "localhost:9092", + ActiveTopics: map[string]v1alpha1.KafkaTopics{ + "myTopic": { + NumPartitions: 10, + ReplicationFactor: 3, + }, + }, + ForProvider: v1alpha1.MyTypeParameters{ + ConfigurableField: "example", + }, + }, + }, + }, + want{ + managed.ExternalObservation{ + ResourceExists: false, + ResourceUpToDate: true, + ConnectionDetails: managed.ConnectionDetails{}, + }, + nil, + }, + }, } for name, tc := range cases { @@ -72,3 +111,82 @@ func TestObserve(t *testing.T) { }) } } + +func TestCreate(t *testing.T) { + type fields struct { + service *resty.Client + } + + type args struct { + ctx context.Context + mg resource.Managed + } + + type want struct { + o managed.ExternalCreation + err error + } + client := resty.New() + httpmock.ActivateNonDefault(client.GetClient()) + defer httpmock.DeactivateAndReset() + httpmock.RegisterResponder("POST", agentServiceUrl+"/worker/create", + func(req *http.Request) (*http.Response, error) { + resp := httpmock.NewStringResponse(200, "OK") + + return resp, nil + }, + ) + cases := map[string]struct { + reason string + fields fields + args args + want want + }{ + "test": { + "test", + fields{service: client}, + + args{ + context.TODO(), + &v1alpha1.MyType{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + Name: "newBenchmark", + }, + Spec: v1alpha1.MyTypeSpec{ + Class: "org.apache.kafka.trogdor.workload.ProduceBenchSpec", + BootstrapServers: "localhost:9092", + ActiveTopics: map[string]v1alpha1.KafkaTopics{ + "myTopic": { + NumPartitions: 10, + ReplicationFactor: 3, + }, + }, + ForProvider: v1alpha1.MyTypeParameters{ + ConfigurableField: "example", + }, + }, + }, + }, + want{ + managed.ExternalCreation{ + ConnectionDetails: managed.ConnectionDetails{}, + }, + nil, + }, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + e := external{service: tc.fields.service} + got, err := e.Create(tc.args.ctx, tc.args.mg) + if diff := cmp.Diff(tc.want.err, err, test.EquateErrors()); diff != "" { + t.Errorf("\n%s\ne.Create(...): -want error, +got error:\n%s\n", tc.reason, diff) + } + if diff := cmp.Diff(tc.want.o, got); diff != "" { + t.Errorf("\n%s\ne.Create(...): -want, +got:\n%s\n", tc.reason, diff) + } + }) + } +} diff --git a/package/crds/sample.template.crossplane.io_mytypes.yaml b/package/crds/sample.template.crossplane.io_mytypes.yaml index 83b5fee..bac47be 100644 --- a/package/crds/sample.template.crossplane.io_mytypes.yaml +++ b/package/crds/sample.template.crossplane.io_mytypes.yaml @@ -53,6 +53,20 @@ spec: spec: description: A MyTypeSpec defines the desired state of a MyType. properties: + activeTopics: + additionalProperties: + description: KafkaTopics are part of the desired state fields + properties: + numPartitions: + type: integer + replicationFactor: + type: integer + type: object + type: object + bootstrapServers: + type: string + class: + type: string deletionPolicy: default: Delete description: DeletionPolicy specifies what will happen to the underlying @@ -62,6 +76,9 @@ spec: - Orphan - Delete type: string + durationMs: + format: int64 + type: integer forProvider: description: MyTypeParameters are the configurable fields of a MyType. properties: @@ -70,6 +87,21 @@ spec: required: - configurableField type: object + inactiveTopics: + additionalProperties: + description: KafkaTopics are part of the desired state fields + properties: + numPartitions: + type: integer + replicationFactor: + type: integer + type: object + type: object + maxMessages: + format: int64 + type: integer + producerNode: + type: string providerConfigRef: default: name: default @@ -94,6 +126,9 @@ spec: required: - name type: object + targetMessagesPerSec: + format: int32 + type: integer writeConnectionSecretToRef: description: WriteConnectionSecretToReference specifies the namespace and name of a Secret to which any connection details for this managed