Skip to content

Commit

Permalink
Add topic & content-type in message adapter (#48)
Browse files Browse the repository at this point in the history
* Add topic & content-type in message adapter
  • Loading branch information
deepanshu42 authored Oct 30, 2022
1 parent fba09e5 commit 09892e0
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 22 deletions.
5 changes: 3 additions & 2 deletions courier-core/api/courier-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ public final class com/gojek/courier/Message$Bytes : com/gojek/courier/Message {
}

public abstract interface class com/gojek/courier/MessageAdapter {
public abstract fun fromMessage (Lcom/gojek/courier/Message;)Ljava/lang/Object;
public abstract fun toMessage (Ljava/lang/Object;)Lcom/gojek/courier/Message;
public abstract fun contentType ()Ljava/lang/String;
public abstract fun fromMessage (Ljava/lang/String;Lcom/gojek/courier/Message;)Ljava/lang/Object;
public abstract fun toMessage (Ljava/lang/String;Ljava/lang/Object;)Lcom/gojek/courier/Message;
}

public abstract interface class com/gojek/courier/MessageAdapter$Factory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import java.lang.reflect.Type
interface MessageAdapter<T> {

/** Returns an object of type `T` that represents a [Message]. */
fun fromMessage(message: Message): T
fun fromMessage(topic: String, message: Message): T

/** Returns a [Message] that represents [data]. */
fun toMessage(data: T): Message
fun toMessage(topic: String, data: T): Message

/** Returns the content type supported by this adapter. */
fun contentType(): String

/** Creates [MessageAdapter] instances based on a type and target usage. */
interface Factory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ private class GsonMessageAdapter<T> constructor(
private val typeAdapter: TypeAdapter<T>
) : MessageAdapter<T> {

override fun fromMessage(message: Message): T {
override fun fromMessage(topic: String, message: Message): T {
val stringValue = when (message) {
is Message.Bytes -> String(message.value)
}
val jsonReader = gson.newJsonReader(StringReader(stringValue))
return typeAdapter.read(jsonReader)!!
}

override fun toMessage(data: T): Message {
override fun toMessage(topic: String, data: T): Message {
val buffer = Buffer()
val writer = OutputStreamWriter(buffer.outputStream(), UTF_8)
val jsonWriter = gson.newJsonWriter(writer)
Expand All @@ -36,6 +36,8 @@ private class GsonMessageAdapter<T> constructor(
val stringValue = buffer.readByteString().utf8()
return Message.Bytes(stringValue.toByteArray())
}

override fun contentType() = "application/json"
}

class GsonMessageAdapterFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ private class MoshiMessageAdapter<T> constructor(
private val jsonAdapter: JsonAdapter<T>
) : MessageAdapter<T> {

override fun fromMessage(message: Message): T {
override fun fromMessage(topic: String, message: Message): T {
val stringValue = when (message) {
is Message.Bytes -> {
val byteString = ByteString.of(message.value, 0, message.value.size)
Expand All @@ -32,11 +32,13 @@ private class MoshiMessageAdapter<T> constructor(
return jsonAdapter.fromJson(stringValue)!!
}

override fun toMessage(data: T): Message {
override fun toMessage(topic: String, data: T): Message {
val stringValue = jsonAdapter.toJson(data)
return Message.Bytes(stringValue.toByteArray())
}

override fun contentType() = "application/json"

private companion object {
private val UTF8_BOM = ByteString.decodeHex("EFBBBF")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ private class ProtobufMessageAdapter<T : MessageLite> constructor(
private val registry: ExtensionRegistryLite?
) : MessageAdapter<T> {

override fun fromMessage(message: Message): T {
override fun fromMessage(topic: String, message: Message): T {
val bytesValue = when (message) {
is Message.Bytes -> message.value
}
Expand All @@ -31,7 +31,9 @@ private class ProtobufMessageAdapter<T : MessageLite> constructor(
}
}

override fun toMessage(data: T): Message = Message.Bytes(data.toByteArray())
override fun toMessage(topic: String, data: T): Message = Message.Bytes(data.toByteArray())

override fun contentType() = "application/x-protobuf"
}

class ProtobufMessageAdapterFactory(
Expand Down
24 changes: 16 additions & 8 deletions courier/src/main/java/com/gojek/courier/coordinator/Coordinator.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ internal class Coordinator(
@Synchronized
override fun send(stubMethod: StubMethod.Send, args: Array<Any>): Any {
val data = stubMethod.argumentProcessor.getDataArgument(args)
val message = stubMethod.messageAdapter.toMessage(data)
stubMethod.argumentProcessor.inject(args)
val topic = stubMethod.argumentProcessor.getTopic()
val message = stubMethod.messageAdapter.toMessage(topic, data)
stubMethod.argumentProcessor.inject(args)
return client.send(message, topic, stubMethod.qos)
}

Expand All @@ -50,9 +50,13 @@ internal class Coordinator(
)

val stream = flowable
.map { it.message }
.observeOn(Schedulers.computation())
.flatMap { message -> message.adapt(stubMethod.messageAdapter)?.let { Flowable.just(it) } ?: Flowable.empty() }
.flatMap { mqttMessage ->
mqttMessage.message.adapt(
mqttMessage.topic,
stubMethod.messageAdapter
)?.let { Flowable.just(it) } ?: Flowable.empty()
}
.toStream()
return stubMethod.streamAdapter.adapt(stream)
}
Expand Down Expand Up @@ -87,9 +91,13 @@ internal class Coordinator(
)

val stream = flowable
.map { it.message }
.observeOn(Schedulers.computation())
.flatMap { message -> message.adapt(stubMethod.messageAdapter)?.let { Flowable.just(it) } ?: Flowable.empty() }
.flatMap { mqttMessage ->
mqttMessage.message.adapt(
mqttMessage.topic,
stubMethod.messageAdapter
)?.let { Flowable.just(it) } ?: Flowable.empty()
}
.toStream()
return stubMethod.streamAdapter.adapt(stream)
}
Expand All @@ -113,9 +121,9 @@ internal class Coordinator(
}
}

private fun <T> Message.adapt(messageAdapter: MessageAdapter<T>): T? {
private fun <T> Message.adapt(topic: String, messageAdapter: MessageAdapter<T>): T? {
return try {
val message = messageAdapter.fromMessage(this)
val message = messageAdapter.fromMessage(topic, this)
logger.d("Coordinator", "Message after parsing: $message")
message
} catch (th: Throwable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import com.gojek.courier.MessageAdapter

internal class ByteArrayMessageAdapter : MessageAdapter<ByteArray> {

override fun fromMessage(message: Message): ByteArray = when (message) {
override fun fromMessage(topic: String, message: Message): ByteArray = when (message) {
is Message.Bytes -> message.value
}

override fun toMessage(data: ByteArray): Message = Message.Bytes(data)
override fun toMessage(topic: String, data: ByteArray): Message = Message.Bytes(data)

override fun contentType() = "application/octet-stream"
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import com.gojek.courier.MessageAdapter

internal class TextMessageAdapter : MessageAdapter<String> {

override fun fromMessage(message: Message): String = when (message) {
override fun fromMessage(topic: String, message: Message): String = when (message) {
is Message.Bytes -> String(message.value)
}

override fun toMessage(data: String): Message = Message.Bytes(data.toByteArray())
override fun toMessage(topic: String, data: String): Message = Message.Bytes(data.toByteArray())

override fun contentType() = "text/plain"
}

0 comments on commit 09892e0

Please sign in to comment.