Skip to content

Commit

Permalink
Add conversions functions
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi committed Oct 9, 2024
1 parent 3ee8c9b commit f51c244
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 20 deletions.
68 changes: 52 additions & 16 deletions control-plane/pkg/apis/bindings/v1beta1/kafka_conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,63 @@ package v1beta1

import (
"context"
"fmt"

"knative.dev/pkg/apis"
v1 "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/bindings/v1"
)

// ConvertTo implements apis.Convertible
func (source *KafkaBinding) ConvertTo(_ context.Context, sink apis.Convertible) error {
return fmt.Errorf("v1beta1 is the highest known version, got: %T", sink)
// ConvertToV1 converts v1beta1 to v1.
func (source *KafkaAuthSpec) ConvertToV1(_ context.Context) *v1.KafkaAuthSpec {
if source == nil {
return nil
}
sink := &v1.KafkaAuthSpec{
BootstrapServers: source.BootstrapServers,
Net: v1.KafkaNetSpec{
SASL: v1.KafkaSASLSpec{
Enable: source.Net.SASL.Enable,
User: v1.SecretValueFromSource{
SecretKeyRef: source.Net.SASL.User.SecretKeyRef,
},
Password: v1.SecretValueFromSource{
SecretKeyRef: source.Net.SASL.Password.SecretKeyRef,
},
Type: v1.SecretValueFromSource{
SecretKeyRef: source.Net.SASL.Type.SecretKeyRef,
},
},
TLS: v1.KafkaTLSSpec{
Enable: source.Net.TLS.Enable,
Cert: v1.SecretValueFromSource{
SecretKeyRef: source.Net.TLS.Cert.SecretKeyRef,
},
Key: v1.SecretValueFromSource{
SecretKeyRef: source.Net.TLS.Key.SecretKeyRef,
},
CACert: v1.SecretValueFromSource{
SecretKeyRef: source.Net.TLS.CACert.SecretKeyRef,
},
},
},
}
return sink
}

// ConvertFrom implements apis.Convertible
func (sink *KafkaBinding) ConvertFrom(_ context.Context, source apis.Convertible) error {
return fmt.Errorf("v1beta1 is the highest known version, got: %T", source)
}
// ConvertFromV1 converts v1 to v1beta1
func (sink *KafkaAuthSpec) ConvertFromV1(source *v1.KafkaAuthSpec) {
if source == nil {
return
}
sink.BootstrapServers = source.BootstrapServers

// ConvertTo implements apis.Convertible
func (source *KafkaAuthSpec) ConvertTo(_ context.Context, sink apis.Convertible) error {
return fmt.Errorf("v1beta1 is the highest known version, got: %T", sink)
}
sink.Net.SASL.Enable = source.Net.SASL.Enable
sink.Net.SASL.Type.SecretKeyRef = source.Net.SASL.Type.SecretKeyRef
sink.Net.SASL.User.SecretKeyRef = source.Net.SASL.User.SecretKeyRef
sink.Net.SASL.Password.SecretKeyRef = source.Net.SASL.Password.SecretKeyRef

sink.Net.TLS.Enable = source.Net.TLS.Enable
sink.Net.TLS.Key.SecretKeyRef = source.Net.TLS.Key.SecretKeyRef
sink.Net.TLS.Cert.SecretKeyRef = source.Net.TLS.Cert.SecretKeyRef
sink.Net.TLS.CACert.SecretKeyRef = source.Net.TLS.CACert.SecretKeyRef

// ConvertFrom implements apis.Convertible
func (sink *KafkaAuthSpec) ConvertFrom(_ context.Context, source apis.Convertible) error {
return fmt.Errorf("v1beta1 is the highest known version, got: %T", source)
return
}
61 changes: 57 additions & 4 deletions control-plane/pkg/apis/sources/v1beta1/kafka_conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,67 @@ import (
"fmt"

"knative.dev/pkg/apis"

bindingsv1beta1 "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/bindings/v1beta1"
v1 "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1"
)

// ConvertTo implements apis.Convertible
func (source *KafkaSource) ConvertTo(ctx context.Context, sink apis.Convertible) error {
return fmt.Errorf("v1beta1 is the highest known version, got: %T", sink)
func (source *KafkaSource) ConvertTo(ctx context.Context, to apis.Convertible) error {
switch sink := to.(type) {
case *v1.KafkaSource:
source.ObjectMeta.DeepCopyInto(&sink.ObjectMeta)
sink.Spec = v1.KafkaSourceSpec{
Consumers: source.Spec.Consumers,
KafkaAuthSpec: *source.Spec.KafkaAuthSpec.ConvertToV1(ctx),
Topics: source.Spec.Topics,
ConsumerGroup: source.Spec.ConsumerGroup,
InitialOffset: v1.Offset(source.Spec.InitialOffset),
Delivery: source.Spec.Delivery,
Ordering: (*v1.DeliveryOrdering)(source.Spec.Ordering),
SourceSpec: source.Spec.SourceSpec,
}
sink.Status = v1.KafkaSourceStatus{
SourceStatus: *source.Status.SourceStatus.DeepCopy(),
Consumers: source.Status.Consumers,
Selector: source.Status.Selector,
Claims: source.Status.Claims,
Placeable: source.Status.Placeable,
}
return nil
default:
return fmt.Errorf("unknown version, got: %T", sink)
}
}

// ConvertFrom implements apis.Convertible
func (sink *KafkaSource) ConvertFrom(ctx context.Context, source apis.Convertible) error {
return fmt.Errorf("v1beta1 is the highest known version, got: %T", source)
func (sink *KafkaSource) ConvertFrom(ctx context.Context, from apis.Convertible) error {

switch source := from.(type) {
case *v1.KafkaSource:
source.ObjectMeta.DeepCopyInto(&sink.ObjectMeta)
authSpec := bindingsv1beta1.KafkaAuthSpec{}
authSpec.ConvertFromV1(&source.Spec.KafkaAuthSpec)
sink.Spec = KafkaSourceSpec{
Consumers: source.Spec.Consumers,
KafkaAuthSpec: authSpec,
Topics: source.Spec.Topics,
ConsumerGroup: source.Spec.ConsumerGroup,
InitialOffset: Offset(source.Spec.InitialOffset),
Delivery: source.Spec.Delivery,
Ordering: (*DeliveryOrdering)(source.Spec.Ordering),
SourceSpec: source.Spec.SourceSpec,
}
sink.Status = KafkaSourceStatus{
SourceStatus: source.Status.SourceStatus,
Consumers: source.Status.Consumers,
Selector: source.Status.Selector,
Claims: source.Status.Claims,
Placeable: source.Status.Placeable,
}

return nil
default:
return fmt.Errorf("unknown version, got: %T", source)
}
}

0 comments on commit f51c244

Please sign in to comment.