diff --git a/config/core/resources/integrationsink.yaml b/config/core/resources/integrationsink.yaml index b123f03fdee..80a790c9f6a 100644 --- a/config/core/resources/integrationsink.yaml +++ b/config/core/resources/integrationsink.yaml @@ -297,6 +297,39 @@ spec: description: The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request. + sns: + type: object + properties: + arn: + type: string + title: Topic Name + description: The SNS topic name name or Amazon Resource Name (ARN). + region: + type: string + title: AWS Region + description: The AWS region to access. + autoCreateTopic: + type: boolean + title: Autocreate Topic + description: Setting the autocreation of the SNS topic. + default: false + sessionToken: + type: string + title: Session Token + description: Amazon AWS Session Token used when the user needs to assume + a IAM role. + uriEndpointOverride: + type: string + title: Overwrite Endpoint URI + description: The overriding endpoint URI. To use this option, you must + also select the `overrideEndpoint` option. + overrideEndpoint: + type: boolean + title: Endpoint Overwrite + description: Select this option to override the endpoint URI. To use + this option, you must also provide a URI for the `uriEndpointOverride` + option. + default: false auth: description: 'Auth configurations' type: object diff --git a/docs/eventing-api.md b/docs/eventing-api.md index d6196c22223..e80b1c1065b 100644 --- a/docs/eventing-api.md +++ b/docs/eventing-api.md @@ -6373,13 +6373,24 @@ knative.dev/eventing/pkg/apis/common/integration/v1alpha1.AWSSQS +sns
+ +knative.dev/eventing/pkg/apis/common/integration/v1alpha1.AWSSNS + + + +

SQS source configuration

+ + + + auth
knative.dev/eventing/pkg/apis/common/integration/v1alpha1.Auth -

SQS source configuration

+

SNS source configuration

diff --git a/pkg/apis/common/integration/v1alpha1/aws.go b/pkg/apis/common/integration/v1alpha1/aws.go index 1fd7ff988f4..80ddb025e6e 100644 --- a/pkg/apis/common/integration/v1alpha1/aws.go +++ b/pkg/apis/common/integration/v1alpha1/aws.go @@ -70,3 +70,9 @@ type AWSDDBStreams struct { StreamIteratorType string `json:"streamIteratorType,omitempty" default:"FROM_LATEST"` // Defines where in the DynamoDB stream to start getting records Delay int `json:"delay,omitempty" default:"500"` // Delay in milliseconds before the next poll from the database } + +type AWSSNS struct { + AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON + Arn string `json:"arn,omitempty" camel:"CAMEL_KAMELET_AWS_SNS_SINK_TOPICNAMEORARN"` // SNS ARN + AutoCreateTopic bool `json:"autoCreateTopic" default:"false"` // Auto-create SNS topic +} diff --git a/pkg/apis/common/integration/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/common/integration/v1alpha1/zz_generated.deepcopy.go index 5d0f7dde59c..9cf353963a1 100644 --- a/pkg/apis/common/integration/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/common/integration/v1alpha1/zz_generated.deepcopy.go @@ -71,6 +71,23 @@ func (in *AWSS3) DeepCopy() *AWSS3 { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AWSSNS) DeepCopyInto(out *AWSSNS) { + *out = *in + out.AWSCommon = in.AWSCommon + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AWSSNS. +func (in *AWSSNS) DeepCopy() *AWSSNS { + if in == nil { + return nil + } + out := new(AWSSNS) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *AWSSQS) DeepCopyInto(out *AWSSQS) { *out = *in diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_types.go b/pkg/apis/sinks/v1alpha1/integration_sink_types.go index 5e2dbb46fd0..efc15e62b89 100644 --- a/pkg/apis/sinks/v1alpha1/integration_sink_types.go +++ b/pkg/apis/sinks/v1alpha1/integration_sink_types.go @@ -76,6 +76,7 @@ type Log struct { type Aws struct { S3 *v1alpha1.AWSS3 `json:"s3,omitempty"` // S3 source configuration SQS *v1alpha1.AWSSQS `json:"sqs,omitempty"` // SQS source configuration + SNS *v1alpha1.AWSSNS `json:"sns,omitempty"` // SNS source configuration Auth *v1alpha1.Auth `json:"auth,omitempty"` } diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_types_test.go b/pkg/apis/sinks/v1alpha1/integration_sink_types_test.go index 27efddffeca..efd54d0f61e 100644 --- a/pkg/apis/sinks/v1alpha1/integration_sink_types_test.go +++ b/pkg/apis/sinks/v1alpha1/integration_sink_types_test.go @@ -87,6 +87,17 @@ func TestAWS(t *testing.T) { if ddbStreams.Region != "eu-north-1" { t.Errorf("AWSDDBStreams.Region = %v, want 'eu-north-1'", ddbStreams.Region) } + + sns := v1alpha1.AWSSNS{ + AWSCommon: v1alpha1.AWSCommon{ + Region: "eu-north-1", + }, + Arn: "example-topic", + } + + if sns.Region != "eu-north-1" { + t.Errorf("AWSDDBStreams.Region = %v, want 'eu-north-1'", sns.Region) + } } // TestAuthFieldAccess tests the HasAuth method and field access in Auth struct diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_validation.go b/pkg/apis/sinks/v1alpha1/integration_sink_validation.go index c96b83d7dbe..9a0ae4dd7a8 100644 --- a/pkg/apis/sinks/v1alpha1/integration_sink_validation.go +++ b/pkg/apis/sinks/v1alpha1/integration_sink_validation.go @@ -42,6 +42,9 @@ func (spec *IntegrationSinkSpec) Validate(ctx context.Context) *apis.FieldError if spec.Aws.SQS != nil { sinkSetCount++ } + if spec.Aws.SNS != nil { + sinkSetCount++ + } } // Validate that only one sink field is set @@ -53,7 +56,7 @@ func (spec *IntegrationSinkSpec) Validate(ctx context.Context) *apis.FieldError // Only perform AWS-specific validation if exactly one AWS sink is configured if sinkSetCount == 1 && spec.Aws != nil { - if spec.Aws.S3 != nil || spec.Aws.SQS != nil { + if spec.Aws.S3 != nil || spec.Aws.SQS != nil || spec.Aws.SNS != nil { // Check that AWS Auth is properly configured if !spec.Aws.Auth.HasAuth() { errs = errs.Also(apis.ErrMissingField("aws.auth.secret.ref.name")) @@ -79,6 +82,15 @@ func (spec *IntegrationSinkSpec) Validate(ctx context.Context) *apis.FieldError errs = errs.Also(apis.ErrMissingField("aws.sqs.region")) } } + // Additional validation for AWS SNS required fields + if spec.Aws.SNS != nil { + if spec.Aws.SNS.Arn == "" { + errs = errs.Also(apis.ErrMissingField("aws.sns.arn")) + } + if spec.Aws.SNS.Region == "" { + errs = errs.Also(apis.ErrMissingField("aws.sns.region")) + } + } } return errs diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_validation_test.go b/pkg/apis/sinks/v1alpha1/integration_sink_validation_test.go index cba73a420b7..a976307d3ed 100644 --- a/pkg/apis/sinks/v1alpha1/integration_sink_validation_test.go +++ b/pkg/apis/sinks/v1alpha1/integration_sink_validation_test.go @@ -149,6 +149,26 @@ func TestIntegrationSinkSpecValidation(t *testing.T) { }, want: apis.ErrMissingField("aws.sqs.queueNameOrArn"), }, + { + name: "AWS SNS sink without TopicNameOrArn (invalid)", + spec: IntegrationSinkSpec{ + Aws: &Aws{ + SNS: &v1alpha1.AWSSNS{ + AWSCommon: v1alpha1.AWSCommon{ + Region: "us-east-1", + }, + }, + Auth: &v1alpha1.Auth{ + Secret: &v1alpha1.Secret{ + Ref: &v1alpha1.SecretReference{ + Name: "aws-secret", + }, + }, + }, + }, + }, + want: apis.ErrMissingField("aws.sns.arn"), + }, { name: "no sink type specified (invalid)", spec: IntegrationSinkSpec{}, diff --git a/pkg/apis/sinks/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/sinks/v1alpha1/zz_generated.deepcopy.go index 3dff23adf1d..a0265e79c00 100644 --- a/pkg/apis/sinks/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/sinks/v1alpha1/zz_generated.deepcopy.go @@ -40,6 +40,11 @@ func (in *Aws) DeepCopyInto(out *Aws) { *out = new(integrationv1alpha1.AWSSQS) **out = **in } + if in.SNS != nil { + in, out := &in.SNS, &out.SNS + *out = new(integrationv1alpha1.AWSSNS) + **out = **in + } if in.Auth != nil { in, out := &in.Auth, &out.Auth *out = new(integrationv1alpha1.Auth) diff --git a/pkg/reconciler/integration/sink/resources/container_image.go b/pkg/reconciler/integration/sink/resources/container_image.go index 3ef22ccc4e8..65091c17596 100644 --- a/pkg/reconciler/integration/sink/resources/container_image.go +++ b/pkg/reconciler/integration/sink/resources/container_image.go @@ -31,6 +31,7 @@ var sinkImageMap = map[string]string{ "log": "gcr.io/knative-nightly/log-sink:latest", "aws-s3": "gcr.io/knative-nightly/aws-s3-sink:latest", "aws-sqs": "gcr.io/knative-nightly/aws-sqs-sink:latest", + "aws-sns": "gcr.io/knative-nightly/aws-sns-sink:latest", } func MakeDeploymentSpec(sink *v1alpha1.IntegrationSink) *appsv1.Deployment { @@ -149,6 +150,18 @@ func makeEnv(sink *v1alpha1.IntegrationSink) []corev1.EnvVar { return envVars } + // AWS SNS environment variables + if sink.Spec.Aws != nil && sink.Spec.Aws.SNS != nil { + envVars = append(envVars, integration.GenerateEnvVarsFromStruct("CAMEL_KAMELET_AWS_SNS_SINK", *sink.Spec.Aws.SNS)...) + if secretName != "" { + envVars = append(envVars, []corev1.EnvVar{ + integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_SNS_SINK_ACCESSKEY", commonv1a1.AwsAccessKey, secretName), + integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_SNS_SINK_SECRETKEY", commonv1a1.AwsSecretKey, secretName), + }...) + } + return envVars + } + // If no valid configuration is found, return empty envVars return envVars } @@ -161,6 +174,8 @@ func selectImage(sink *v1alpha1.IntegrationSink) string { return sinkImageMap["aws-s3"] case sink.Spec.Aws != nil && sink.Spec.Aws.SQS != nil: return sinkImageMap["aws-sqs"] + case sink.Spec.Aws != nil && sink.Spec.Aws.SNS != nil: + return sinkImageMap["aws-sns"] default: return "" }