Skip to content

Commit

Permalink
make this release compatible backwards
Browse files Browse the repository at this point in the history
  • Loading branch information
bwiercinski committed May 9, 2023
1 parent 30737a7 commit 6868d81
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,38 @@ import com.ocadotechnology.pass4s.core.Message
import com.ocadotechnology.pass4s.core.Message.Payload
import com.ocadotechnology.pass4s.kernel.Consumer
import com.ocadotechnology.pass4s.kernel.Sender
import io.circe.Decoder
import io.circe.Encoder
import io.circe.parser.decode
import io.circe.syntax._

object syntax {

implicit final class ProxiedSenderSyntax[F[_], P](private val underlying: Sender[F, Message[P]]) extends AnyVal {

def usingS3Proxy(config: S3ProxyConfig.Sender)(implicit s3Client: S3Client[F], uuid: UUIDGen[F], F: Monad[F]): Sender[F, Message[P]] =
@deprecated(
"This method is deprecated, because it uses `legacyFormatEncoder`. Switch to the new format using " +
"`usingS3ProxyCustomEncoder`, but before make sure that the consumer is using pass4s +0.4.x. " +
"In some future release legacy formats will be removed.",
"0.4.0"
)
def usingS3Proxy(config: S3ProxyConfig.Sender)(implicit s3Client: S3Client[F], uuid: UUIDGen[F], F: Monad[F]): Sender[F, Message[P]] = {
implicit val pointerEncoder: Encoder[PayloadS3Pointer] = PayloadS3Pointer.legacyFormatEncoder
usingS3ProxyCustomEncoder(config)
}

def usingS3ProxyCustomEncoder(
config: S3ProxyConfig.Sender
)(
implicit s3Client: S3Client[F],
uuid: UUIDGen[F],
F: Monad[F],
pointerEncoder: Encoder[PayloadS3Pointer]
): Sender[F, Message[P]] =
underlying.contramapM { msg =>
if (s3.shouldSendToS3(config.minPayloadSize, msg.payload))
for {
s3Key <- uuid.randomUUID.map(_.toString())
s3Key <- UUIDGen.randomString
_ <- s3Client.putObject(config.bucket, s3Key)(msg.payload.text)
pointer = PayloadS3Pointer(config.bucket, s3Key)
updatedMsg = s3.replacePayloadWithPointer(config, msg, pointer)
Expand All @@ -45,9 +65,15 @@ object syntax {

}

implicit final class ProxiedConsumerSyntax[F[_], A](private val underlying: Consumer[F, Payload]) extends AnyVal {
implicit final class ProxiedConsumerSyntax[F[_]](private val underlying: Consumer[F, Payload]) extends AnyVal {

def usingS3Proxy[P](config: S3ProxyConfig.Consumer)(implicit s3Client: S3Client[F], F: MonadThrow[F]): Consumer[F, Payload] =
def usingS3Proxy(
config: S3ProxyConfig.Consumer
)(
implicit s3Client: S3Client[F],
F: MonadThrow[F],
pointerDecoder: Decoder[PayloadS3Pointer]
): Consumer[F, Payload] =
underlying
.map(msg => (msg, decode[PayloadS3Pointer](msg.text).toOption))
.afterEach { case (_, maybePointer) =>
Expand Down Expand Up @@ -94,10 +120,16 @@ object syntax {
)
}

def replacePayloadWithPointer[P](config: S3ProxyConfig.Sender, msg: Message[P], pointer: PayloadS3Pointer): Message[P] = {
def replacePayloadWithPointer[P](
config: S3ProxyConfig.Sender,
msg: Message[P],
pointer: PayloadS3Pointer
)(
implicit pointerEncoder: Encoder[PayloadS3Pointer]
): Message[P] = {
val originalPayloadSize = calculatePayloadBytesSize(msg.payload)
val updatedMetadata = config.payloadSizeAttributeName.fold(msg.payload.metadata) { key =>
msg.payload.metadata + (key -> originalPayloadSize.toString())
msg.payload.metadata + (key -> originalPayloadSize.toString)
}
val updatedPayload = msg
.payload
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,11 @@ object S3ProxyCompatibilityTests extends MutableIOSuite {
}
} yield (amazonSqsExtendedClient, bucketName, queue)

val payload: Message.Payload = Message.Payload("body", Map("foo" -> "bar"))

test("message sent from Amazon SQS Extended Client can be handled by Consumer using S3Proxy addon") { env =>
bucketAndTopics(env).use { case (amazonSqsExtendedClient, bucketName, queueUrl) =>
implicit val s3Client: S3Client[IO] = env.s3Client
val payload = Payload("body", Map("foo" -> "bar"))

val amazonClientSendMessage = {
val metadata = payload.metadata.fmap(MessageAttributeValue.builder().dataType("String").stringValue(_).build()).asJava
Expand Down Expand Up @@ -103,11 +104,10 @@ object S3ProxyCompatibilityTests extends MutableIOSuite {
test("message sent from Sender using S3Proxy addon can be handled by Amazon SQS Extended Client") { env =>
bucketAndTopics(env).use { case (amazonSqsExtendedClient, bucketName, queueUrl) =>
implicit val s3Client: S3Client[IO] = env.s3Client
val payload = Payload("body", Map("foo" -> "bar"))

val pass4sSendMessage = {
val config = S3ProxyConfig.Sender.withSnsDefaults(bucketName).copy(minPayloadSize = None)
env.broker.sender.usingS3Proxy(config).sendOne(Message(payload, SqsDestination(queueUrl)))
env.broker.sender.usingS3ProxyCustomEncoder(config).sendOne(Message(payload, SqsDestination(queueUrl)))
}

val amazonClientReceiveMessage = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object S3ProxyTests extends MutableIOSuite {

val payload: Message.Payload = Message.Payload("body", Map("foo" -> "bar"))

test("Sender and consumer should do a full S3 round trip, message should not be deleted afterwards when deleting is disabled").usingRes {
test("Sender and consumer should do a full S3 round trip. Legacy format.").usingRes {
case (broker, implicit0(s3Client: S3Client[IO]), snsConnector, sqsConnector) =>
bucketAndTopics(s3Client, snsConnector, sqsConnector).use { case (bucketName, topicArn, queueUrl) =>
val consume1MessageFromQueue = {
Expand All @@ -54,6 +54,32 @@ object S3ProxyTests extends MutableIOSuite {
val senderConfig = S3ProxyConfig.Sender.withSnsDefaults(bucketName).copy(minPayloadSize = None)
broker.sender.usingS3Proxy(senderConfig).sendOne(Message(payload, SnsDestination(topicArn)).widen)
}

for {
_ <- sendMessageOnTopic
objects <- s3Client.listObjects(bucketName)
message <- consume1MessageFromQueue
afterObjects <- s3Client.listObjects(bucketName)
} yield expect.all(
message == payload,
objects.size == 1,
afterObjects.size == 1
)
}
}

test("Sender and consumer should do a full S3 round trip, message should not be deleted afterwards when deleting is disabled").usingRes {
case (broker, implicit0(s3Client: S3Client[IO]), snsConnector, sqsConnector) =>
bucketAndTopics(s3Client, snsConnector, sqsConnector).use { case (bucketName, topicArn, queueUrl) =>
val consume1MessageFromQueue = {
val consumerConfig = S3ProxyConfig.Consumer.withSnsDefaults().copy(shouldDeleteAfterProcessing = false)
val consumer: Consumer[IO, Message.Payload] = broker.consumer(SqsEndpoint(queueUrl)).usingS3Proxy(consumerConfig)
Consumer.toStreamSynchronous(consumer).head.compile.lastOrError
}
val sendMessageOnTopic = {
val senderConfig = S3ProxyConfig.Sender.withSnsDefaults(bucketName).copy(minPayloadSize = None)
broker.sender.usingS3ProxyCustomEncoder(senderConfig).sendOne(Message(payload, SnsDestination(topicArn)).widen)
}
for {
_ <- sendMessageOnTopic
objects <- s3Client.listObjects(bucketName)
Expand Down

0 comments on commit 6868d81

Please sign in to comment.