Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter Policy - Basic field support #13

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ Example: `aws2-sqs://{{env:QUEUE_NAME}}?amazonSQSEndpoint={{env:SQS_ENDPOINT}}&.
Tested with [elasticmq](https://github.com/adamw/elasticmq).

```
docker run -d -p 9911:9911 -v "$PWD/example/config":/etc/sns jameskbride/local-sns
cd example
docker-compose up
```

## Development
Expand Down
21 changes: 19 additions & 2 deletions example/config/db.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,33 @@
"owner" : "",
"topicArn" : "arn:aws:sns:us-east-1:1465414804035:test1",
"protocol" : "sqs",
"endpoint" : "aws2-sqs://queue1?accessKey=xxx&secretKey=xxx&region=us-east-1&trustAllCertificates=true&overrideEndpoint=true&uriEndpointOverride=http://sqs:9324"
"endpoint" : "aws2-sqs://queue1?accessKey=xxx&secretKey=xxx&region=us-east-1&trustAllCertificates=true&overrideEndpoint=true&uriEndpointOverride=http://sqs:9324",
"subscriptionAttributes" : {
"FilterPolicy" : "{\"status\": [\"not_sent\", \"resend\"], \"amount\": [10.5], \"sold\": [true] }",
"FilterPolicyScope" : "MessageBody"
}
}, {
"arn" : "6df4ed2b-a650-4f7c-910a-1a89c7cae5a6",
"owner" : "",
"topicArn" : "arn:aws:sns:us-east-1:1465414804035:test1",
"protocol" : "file",
"endpoint" : "file://tmp/logs?fileName=messages.log&fileExist=Append&appendChars=\\n"
"endpoint" : "file://tmp/logs?fileName=messages.log&fileExist=Append&appendChars=\\n",
"subscriptionAttributes" : {
"FilterPolicy" : "{\"status\": [\"not_sent\", \"resend\"], \"amount\": [10.5], \"sold\": [true] }",
"RawMessageDelivery" : "true"
}
}, {
"arn" : "25da5e63-d5d3-469d-9e0c-e33539948bd1",
"owner" : "",
"topicArn" : "arn:aws:sns:us-east-1:1465414804035:test2",
"protocol" : "file",
"endpoint" : "file://tmp/logs?fileName=no-attributes.log&fileExist=Append&appendChars=\\n"
} ],
"topics" : [ {
"arn" : "arn:aws:sns:us-east-1:1465414804035:test1",
"name" : "test1"
}, {
"arn" : "arn:aws:sns:us-east-1:1465414804035:test2",
"name" : "test2"
} ]
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import java.io.Serializable

private const val ATTRIBUTE_PATTERN = ".*Attributes\\.entry\\.(\\d+)\\.(.*?)"

data class MessageAttribute(val name:String, val value:String): Serializable {
data class MessageAttribute(val name:String, val value:String, val dataType:String = "String"): Serializable {
companion object {
fun parse(attributes: List<MutableMap.MutableEntry<String, String>>): Map<String, MessageAttribute> {
fun parse(attributes: List<Map.Entry<String, String>>): Map<String, MessageAttribute> {
val pattern = ATTRIBUTE_PATTERN.toRegex()
val entryNumbers = attributes.map { attribute ->
val match = pattern.matchEntire(attribute.key)
Expand All @@ -24,7 +24,12 @@ data class MessageAttribute(val name:String, val value:String): Serializable {
it.key.matches(namePattern.toRegex())
}!!.value

mapOf(name to MessageAttribute(name, value))
val dataType = attributes.find {
val namePattern = ".*Attributes\\.entry\\.$entryNumber.Value.DataType"
it.key.matches(namePattern.toRegex())
}!!.value

mapOf(name to MessageAttribute(name, value, dataType))
}.fold(mapOf<String, MessageAttribute>()) { acc, map -> acc + map }

return messageAttributes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ private const val SUBSCRIPTION_ATTRIBUTE_PATTERN = ".*Attributes\\.entry\\.(\\d+

data class SubscriptionAttribute(val name:String, val value:String) {
companion object {

const val FILTER_POLICY = "FilterPolicy"

fun parse(attributes: List<MutableMap.MutableEntry<String, String>>): Map<String, String> {
val pattern = SUBSCRIPTION_ATTRIBUTE_PATTERN.toRegex()
val entryNumbers = attributes.map { attribute ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.jameskbride.localsns.routes.topics
import com.google.gson.Gson
import com.jameskbride.localsns.*
import com.jameskbride.localsns.models.*
import com.jameskbride.localsns.models.SubscriptionAttribute.Companion.FILTER_POLICY
import io.vertx.core.json.JsonObject
import io.vertx.ext.web.RoutingContext
import io.vertx.kotlin.core.json.get
Expand All @@ -22,9 +23,10 @@ val publishRoute: (RoutingContext) -> Unit = route@{ ctx: RoutingContext ->
val topicArn = getTopicArn(topicArnFormAttribute, targetArnFormAttribute)
val message = getFormAttribute(ctx, "Message")
val messageStructure = getFormAttribute(ctx, "MessageStructure")
val attributes = ctx.request().formAttributes()
val formAttributes = ctx.request().formAttributes()
logger.debug("MessageAttributes passed to publish: {}", formAttributes)
val attributes = formAttributes
.filter { it.key.startsWith("MessageAttributes.entry") }
.filterNot { it.key.matches(".*\\.DataType.*".toRegex()) }
val vertx = ctx.vertx()

if (topicArn == null) {
Expand Down Expand Up @@ -135,7 +137,6 @@ private fun publishBasicMessage(
) {
subscriptions.forEach { subscription ->
try {
logger.info("Message to publish: $message")
publishMessage(subscription, message, messageAttributes, producerTemplate, logger)
} catch (e: Exception) {
logger.error("An error occurred when publishing to: ${subscription.endpoint}", e)
Expand All @@ -154,6 +155,17 @@ private fun publishMessage(
producer: ProducerTemplate,
logger: Logger
) {
if (subscription.subscriptionAttributes.containsKey(FILTER_POLICY)) {
val filterPolicyScope = subscription.subscriptionAttributes.get("FilterPolicyScope")
val match = when (filterPolicyScope) {
"MessageBody" -> matchesFilterPolicy(subscription, message)
else -> matchesFilterPolicy(subscription, messageAttributes)
}
if (!match) {
return
}
}

val headers = messageAttributes.map { it.key to it.value.value }.toMap() +
mapOf(
"x-amz-sns-message-type" to "Notification",
Expand Down Expand Up @@ -183,6 +195,53 @@ private fun publishMessage(
}
}

private fun matchesFilterPolicy(
subscription: Subscription,
messageAttributes: Map<String, MessageAttribute>
): Boolean {
val filterPolicySubscriptionAttribute = subscription.subscriptionAttributes[FILTER_POLICY]
val filterPolicy = JsonObject(filterPolicySubscriptionAttribute)
val matched = filterPolicy.map.all {
if (!messageAttributes.containsKey(it.key)) {
false
} else {
val permittedValues = it.value as List<*>
val messageAttribute = messageAttributes[it.key]
when (messageAttribute!!.dataType) {
"Number" -> {
val parsedAttribute = messageAttribute.value.toDouble()
permittedValues.contains(parsedAttribute)
}
else -> {
permittedValues.any {permittedValue ->
when (permittedValue) {
(permittedValue is Boolean) -> permittedValue.toString() == messageAttribute.value
else -> permittedValue == messageAttribute.value
}
}
}
}
}
}
return matched
}

private fun matchesFilterPolicy(subscription: Subscription, message:String): Boolean {
val filterPolicySubscriptionAttribute = subscription.subscriptionAttributes[FILTER_POLICY]
val filterPolicy = JsonObject(filterPolicySubscriptionAttribute)
val messageJson = JsonObject(message)
val matched = filterPolicy.map.all {
if (!messageJson.containsKey(it.key)) {
false
} else {
val permittedValues = it.value as List<*>
val messageAttribute = messageJson.getValue(it.key)
permittedValues.contains(messageAttribute!!)
}
}
return matched
}

private fun publishToSqs(
subscription: Subscription,
message: String,
Expand Down Expand Up @@ -224,12 +283,7 @@ private fun publishToLambda(
val record = LambdaRecord("aws:sns", subscription.arn, 1.0, snsMessage)
val event = LambdaEvent(listOf(record))
val messageToPublish = gson.toJson(event)
producer.asyncRequestBodyAndHeaders(
subscription.decodedEndpointUrl(),
messageToPublish,
headers + mapOf("Content-Type" to "application/json")
)
.exceptionally { logger.error("Error publishing message $message, to subscription: $subscription", it) }
publish(subscription, messageToPublish, headers + mapOf("Content-Type" to "application/json"), producer, logger)
}

private fun publishToHttp(
Expand Down Expand Up @@ -274,6 +328,7 @@ private fun publish(
producer: ProducerTemplate,
logger: Logger
) {
logger.info("Publishing to subscription: ${subscription.arn}: message: $message, headers: $headers")
producer.asyncRequestBodyAndHeaders(subscription.decodedEndpointUrl(), message, headers)
.exceptionally { logger.error("Error publishing message $message, to subscription: $subscription", it) }
}
Expand Down
43 changes: 43 additions & 0 deletions src/test/kotlin/com/jameskbride/localsns/MessageAttributeTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.jameskbride.localsns

import com.jameskbride.localsns.models.MessageAttribute
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test

class MessageAttributeTest {

@Test
fun `it can parse StringValue attributes`() {
val rawAttributes = mapOf(
"MessageAttributes.entry.1.Name" to "status",
"MessageAttributes.entry.1.Value.DataType" to "String",
"MessageAttributes.entry.1.Value.StringValue" to "not_sent"
).entries.toList()

val messageAttributes = MessageAttribute.parse(rawAttributes)
assertTrue(messageAttributes.keys.contains("status"))

val messageAttribute = messageAttributes["status"]
assertEquals("status", messageAttribute!!.name)
assertEquals("String", messageAttribute.dataType)
assertEquals("not_sent", messageAttribute.value)
}

@Test
fun `it can parse Number attributes`() {
val rawAttributes = mapOf(
"MessageAttributes.entry.1.Name" to "amount",
"MessageAttributes.entry.1.Value.DataType" to "Number",
"MessageAttributes.entry.1.Value.StringValue" to "10.56"
).entries.toList()

val messageAttributes = MessageAttribute.parse(rawAttributes)
assertTrue(messageAttributes.keys.contains("amount"))

val messageAttribute = messageAttributes["amount"]
assertEquals("amount", messageAttribute!!.name)
assertEquals("Number", messageAttribute.dataType)
assertEquals("10.56", messageAttribute.value)
}
}
Loading
Loading