Skip to content

Commit

Permalink
Adding sns sink to Integration Sink (#8365)
Browse files Browse the repository at this point in the history
adding sns sink

Signed-off-by: Matthias Wessendorf <mwessend@redhat.com>
  • Loading branch information
matzew authored Dec 3, 2024
1 parent d4357e3 commit 6e7e3d4
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 2 deletions.
33 changes: 33 additions & 0 deletions config/core/resources/integrationsink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,39 @@ spec:
description: The duration (in seconds) that the received messages are
hidden from subsequent retrieve requests after being retrieved by
a ReceiveMessage request.
sns:
type: object
properties:
arn:
type: string
title: Topic Name
description: The SNS topic name name or Amazon Resource Name (ARN).
region:
type: string
title: AWS Region
description: The AWS region to access.
autoCreateTopic:
type: boolean
title: Autocreate Topic
description: Setting the autocreation of the SNS topic.
default: false
sessionToken:
type: string
title: Session Token
description: Amazon AWS Session Token used when the user needs to assume
a IAM role.
uriEndpointOverride:
type: string
title: Overwrite Endpoint URI
description: The overriding endpoint URI. To use this option, you must
also select the `overrideEndpoint` option.
overrideEndpoint:
type: boolean
title: Endpoint Overwrite
description: Select this option to override the endpoint URI. To use
this option, you must also provide a URI for the `uriEndpointOverride`
option.
default: false
auth:
description: 'Auth configurations'
type: object
Expand Down
13 changes: 12 additions & 1 deletion docs/eventing-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -6373,13 +6373,24 @@ knative.dev/eventing/pkg/apis/common/integration/v1alpha1.AWSSQS
</tr>
<tr>
<td>
<code>sns</code><br/>
<em>
knative.dev/eventing/pkg/apis/common/integration/v1alpha1.AWSSNS
</em>
</td>
<td>
<p>SQS source configuration</p>
</td>
</tr>
<tr>
<td>
<code>auth</code><br/>
<em>
knative.dev/eventing/pkg/apis/common/integration/v1alpha1.Auth
</em>
</td>
<td>
<p>SQS source configuration</p>
<p>SNS source configuration</p>
</td>
</tr>
</tbody>
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/common/integration/v1alpha1/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,9 @@ type AWSDDBStreams struct {
StreamIteratorType string `json:"streamIteratorType,omitempty" default:"FROM_LATEST"` // Defines where in the DynamoDB stream to start getting records
Delay int `json:"delay,omitempty" default:"500"` // Delay in milliseconds before the next poll from the database
}

type AWSSNS struct {
AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON
Arn string `json:"arn,omitempty" camel:"CAMEL_KAMELET_AWS_SNS_SINK_TOPICNAMEORARN"` // SNS ARN
AutoCreateTopic bool `json:"autoCreateTopic" default:"false"` // Auto-create SNS topic
}
17 changes: 17 additions & 0 deletions pkg/apis/common/integration/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/apis/sinks/v1alpha1/integration_sink_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type Log struct {
type Aws struct {
S3 *v1alpha1.AWSS3 `json:"s3,omitempty"` // S3 source configuration
SQS *v1alpha1.AWSSQS `json:"sqs,omitempty"` // SQS source configuration
SNS *v1alpha1.AWSSNS `json:"sns,omitempty"` // SNS source configuration
Auth *v1alpha1.Auth `json:"auth,omitempty"`
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/apis/sinks/v1alpha1/integration_sink_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,17 @@ func TestAWS(t *testing.T) {
if ddbStreams.Region != "eu-north-1" {
t.Errorf("AWSDDBStreams.Region = %v, want 'eu-north-1'", ddbStreams.Region)
}

sns := v1alpha1.AWSSNS{
AWSCommon: v1alpha1.AWSCommon{
Region: "eu-north-1",
},
Arn: "example-topic",
}

if sns.Region != "eu-north-1" {
t.Errorf("AWSDDBStreams.Region = %v, want 'eu-north-1'", sns.Region)
}
}

// TestAuthFieldAccess tests the HasAuth method and field access in Auth struct
Expand Down
14 changes: 13 additions & 1 deletion pkg/apis/sinks/v1alpha1/integration_sink_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ func (spec *IntegrationSinkSpec) Validate(ctx context.Context) *apis.FieldError
if spec.Aws.SQS != nil {
sinkSetCount++
}
if spec.Aws.SNS != nil {
sinkSetCount++
}
}

// Validate that only one sink field is set
Expand All @@ -53,7 +56,7 @@ func (spec *IntegrationSinkSpec) Validate(ctx context.Context) *apis.FieldError

// Only perform AWS-specific validation if exactly one AWS sink is configured
if sinkSetCount == 1 && spec.Aws != nil {
if spec.Aws.S3 != nil || spec.Aws.SQS != nil {
if spec.Aws.S3 != nil || spec.Aws.SQS != nil || spec.Aws.SNS != nil {
// Check that AWS Auth is properly configured
if !spec.Aws.Auth.HasAuth() {
errs = errs.Also(apis.ErrMissingField("aws.auth.secret.ref.name"))
Expand All @@ -79,6 +82,15 @@ func (spec *IntegrationSinkSpec) Validate(ctx context.Context) *apis.FieldError
errs = errs.Also(apis.ErrMissingField("aws.sqs.region"))
}
}
// Additional validation for AWS SNS required fields
if spec.Aws.SNS != nil {
if spec.Aws.SNS.Arn == "" {
errs = errs.Also(apis.ErrMissingField("aws.sns.arn"))
}
if spec.Aws.SNS.Region == "" {
errs = errs.Also(apis.ErrMissingField("aws.sns.region"))
}
}
}

return errs
Expand Down
20 changes: 20 additions & 0 deletions pkg/apis/sinks/v1alpha1/integration_sink_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,26 @@ func TestIntegrationSinkSpecValidation(t *testing.T) {
},
want: apis.ErrMissingField("aws.sqs.queueNameOrArn"),
},
{
name: "AWS SNS sink without TopicNameOrArn (invalid)",
spec: IntegrationSinkSpec{
Aws: &Aws{
SNS: &v1alpha1.AWSSNS{
AWSCommon: v1alpha1.AWSCommon{
Region: "us-east-1",
},
},
Auth: &v1alpha1.Auth{
Secret: &v1alpha1.Secret{
Ref: &v1alpha1.SecretReference{
Name: "aws-secret",
},
},
},
},
},
want: apis.ErrMissingField("aws.sns.arn"),
},
{
name: "no sink type specified (invalid)",
spec: IntegrationSinkSpec{},
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/sinks/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions pkg/reconciler/integration/sink/resources/container_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var sinkImageMap = map[string]string{
"log": "gcr.io/knative-nightly/log-sink:latest",
"aws-s3": "gcr.io/knative-nightly/aws-s3-sink:latest",
"aws-sqs": "gcr.io/knative-nightly/aws-sqs-sink:latest",
"aws-sns": "gcr.io/knative-nightly/aws-sns-sink:latest",
}

func MakeDeploymentSpec(sink *v1alpha1.IntegrationSink) *appsv1.Deployment {
Expand Down Expand Up @@ -149,6 +150,18 @@ func makeEnv(sink *v1alpha1.IntegrationSink) []corev1.EnvVar {
return envVars
}

// AWS SNS environment variables
if sink.Spec.Aws != nil && sink.Spec.Aws.SNS != nil {
envVars = append(envVars, integration.GenerateEnvVarsFromStruct("CAMEL_KAMELET_AWS_SNS_SINK", *sink.Spec.Aws.SNS)...)
if secretName != "" {
envVars = append(envVars, []corev1.EnvVar{
integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_SNS_SINK_ACCESSKEY", commonv1a1.AwsAccessKey, secretName),
integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_SNS_SINK_SECRETKEY", commonv1a1.AwsSecretKey, secretName),
}...)
}
return envVars
}

// If no valid configuration is found, return empty envVars
return envVars
}
Expand All @@ -161,6 +174,8 @@ func selectImage(sink *v1alpha1.IntegrationSink) string {
return sinkImageMap["aws-s3"]
case sink.Spec.Aws != nil && sink.Spec.Aws.SQS != nil:
return sinkImageMap["aws-sqs"]
case sink.Spec.Aws != nil && sink.Spec.Aws.SNS != nil:
return sinkImageMap["aws-sns"]
default:
return ""
}
Expand Down

0 comments on commit 6e7e3d4

Please sign in to comment.