Skip to content

Commit

Permalink
Google Cloud Pub/sub: 'messageId' and 'publishTime' should not be pub…
Browse files Browse the repository at this point in the history
…lished (#1795)

* #1786 - 'messageId' and 'publishTime' should not be published nor required when creating PubSubMessage for publishing.
  • Loading branch information
LGLO authored and ennru committed Jul 1, 2019
1 parent ed5770c commit 977218e
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.Materializer
import akka.stream.alpakka.googlecloud.pubsub._
import spray.json.DefaultJsonProtocol._
import spray.json.{deserializationError, DefaultJsonProtocol, JsString, JsValue, RootJsonFormat}
import spray.json.{deserializationError, DefaultJsonProtocol, JsObject, JsString, JsValue, JsonFormat, RootJsonFormat}

import scala.collection.immutable
import scala.concurrent.Future
Expand Down Expand Up @@ -57,7 +57,22 @@ private[pubsub] trait PubSubApi {
}
override def write(instant: Instant): JsValue = JsString(instant.toString)
}
private implicit val pubSubMessageFormat = DefaultJsonProtocol.jsonFormat4(PubSubMessage.apply)

private implicit val pubSubMessageFormat = {
val defaultFormat = DefaultJsonProtocol.jsonFormat4(PubSubMessage.apply)
val attributesFormat = implicitly[JsonFormat[Option[immutable.Map[String, String]]]]
new RootJsonFormat[PubSubMessage] {
def read(json: JsValue): PubSubMessage = defaultFormat.read(json)
//Do not publish "messageId" nor "publishTime"
def write(m: PubSubMessage): JsValue = {
val fields = List("data" -> JsString(m.data)) ++ m.attributes.map(
attrs => "attributes" -> attributesFormat.write(m.attributes)
)
JsObject(fields: _*)
}

}
}
private implicit val pubSubRequestFormat = DefaultJsonProtocol.jsonFormat1(PublishRequest.apply)
private implicit val gcePubSubResponseFormat = DefaultJsonProtocol.jsonFormat1(PublishResponse)
private implicit val receivedMessageFormat = DefaultJsonProtocol.jsonFormat2(ReceivedMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,12 @@ object PubSubConfig {
/**
*
* @param data the base64 encoded data
* @param messageId the message id
* @param messageId the message id given by server. It must not be populated when publishing.
* @param attributes optional extra attributes for this message.
* @param publishTime the time the message was published. It must not be populated when publishing.
*/
final case class PubSubMessage(data: String,
//Should be Option[String]. '""' is used as default when creating messages for publishing.
messageId: String,
attributes: Option[immutable.Map[String, String]] = None,
publishTime: Option[Instant] = None) {
Expand All @@ -99,9 +100,18 @@ final case class PubSubMessage(data: String,

object PubSubMessage {

def apply(data: String): PubSubMessage = PubSubMessage(data, "")

def apply(data: String, attributes: immutable.Map[String, String]): PubSubMessage =
PubSubMessage(data, "", Some(attributes), None)

/**
* Java API: create [[PubSubMessage]]
*/
def create(data: String) =
PubSubMessage(data)

@deprecated("Setting messageId when creating message for publishing is futile.", "1.1.0")
def create(data: String, messageId: String) =
PubSubMessage(data, messageId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ private static void example() throws NoSuchAlgorithmException, InvalidKeySpecExc

// #publish-single
PubSubMessage publishMessage =
PubSubMessage.create(
"1", new String(Base64.getEncoder().encode("Hello Google!".getBytes())));
PubSubMessage.create(new String(Base64.getEncoder().encode("Hello Google!".getBytes())));
PublishRequest publishRequest = PublishRequest.of(Lists.newArrayList(publishMessage));

Source<PublishRequest, NotUsed> source = Source.single(publishRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class GooglePubSubSpec extends FlatSpec with MockitoSugar with ScalaFutures with
config = config
)

val request = PublishRequest(Seq(PubSubMessage(messageId = "1", data = base64String("Hello Google!"))))
val request = PublishRequest(Seq(PubSubMessage(data = base64String("Hello Google!"))))

val source = Source(List(request))

Expand Down Expand Up @@ -87,7 +87,7 @@ class GooglePubSubSpec extends FlatSpec with MockitoSugar with ScalaFutures with
config = config
)

val request = PublishRequest(Seq(PubSubMessage(messageId = "2", data = base64String("Hello Google!"))))
val request = PublishRequest(Seq(PubSubMessage(data = base64String("Hello Google!"))))

val source = Source(List(request))
val result = source.via(flow).runWith(Sink.seq)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,49 @@ class PubSubApiSpec extends FlatSpec with BeforeAndAfterAll with ScalaFutures wi

val publishMessage =
PubSubMessage(
messageId = "1",
data = new String(Base64.getEncoder.encode("Hello Google!".getBytes)),
attributes = Map("row_id" -> "7")
)
val publishRequest = PublishRequest(Seq(publishMessage))

val expectedPublishRequest =
"""{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ==","attributes":{"row_id":"7"}}]}"""
val publishResponse = """{"messageIds":["1"]}"""

mock.register(
WireMock
.post(
urlEqualTo(s"/v1/projects/${config.projectId}/topics/topic1:publish")
)
.withRequestBody(WireMock.equalToJson(expectedPublishRequest))
.withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken))
.willReturn(
aResponse()
.withStatus(200)
.withBody(publishResponse)
.withHeader("Content-Type", "application/json")
)
)

val result =
TestHttpApi.publish(config.projectId, "topic1", Some(accessToken), publishRequest)

result.futureValue shouldBe Seq("1")
}

it should "not send 'messageId' and 'publishTime' when publishing" in {

val publishMessage =
PubSubMessage(
data = new String(Base64.getEncoder.encode("Hello Google!".getBytes)),
messageId = "my-id",
attributes = Some(Map("row_id" -> "7")),
publishTime = Some(Instant.parse("2014-10-02T15:01:23.045123456Z"))
)
val publishRequest = PublishRequest(Seq(publishMessage))

val expectedPublishRequest =
"""{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ==","messageId":"1","attributes":{"row_id":"7"},"publishTime":"2014-10-02T15:01:23.045123456Z"}]}"""
"""{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ==","attributes":{"row_id":"7"}}]}"""
val publishResponse = """{"messageIds":["1"]}"""

mock.register(
Expand Down Expand Up @@ -93,11 +127,11 @@ class PubSubApiSpec extends FlatSpec with BeforeAndAfterAll with ScalaFutures wi
}

val publishMessage =
PubSubMessage(messageId = "1", data = new String(Base64.getEncoder.encode("Hello Google!".getBytes)))
PubSubMessage(data = new String(Base64.getEncoder.encode("Hello Google!".getBytes)))
val publishRequest = PublishRequest(Seq(publishMessage))

val expectedPublishRequest =
"""{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ==","messageId":"1"}]}"""
"""{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ=="}]}"""
val publishResponse = """{"messageIds":["1"]}"""

mock.register(
Expand All @@ -123,7 +157,7 @@ class PubSubApiSpec extends FlatSpec with BeforeAndAfterAll with ScalaFutures wi

it should "Pull with results" in {

val publishMessage =
val message =
PubSubMessage(messageId = "1", data = new String(Base64.getEncoder.encode("Hello Google!".getBytes)))

val pullResponse =
Expand All @@ -144,7 +178,7 @@ class PubSubApiSpec extends FlatSpec with BeforeAndAfterAll with ScalaFutures wi
)

val result = TestHttpApi.pull(config.projectId, "sub1", Some(accessToken), true, 1000)
result.futureValue shouldBe PullResponse(Some(Seq(ReceivedMessage("ack1", publishMessage))))
result.futureValue shouldBe PullResponse(Some(Seq(ReceivedMessage("ack1", message))))

}

Expand All @@ -155,7 +189,7 @@ class PubSubApiSpec extends FlatSpec with BeforeAndAfterAll with ScalaFutures wi
val GoogleApisHost = s"http://localhost:${wiremockServer.port()}"
}

val publishMessage =
val message =
PubSubMessage(messageId = "1", data = new String(Base64.getEncoder.encode("Hello Google!".getBytes)))

val pullResponse =
Expand All @@ -176,7 +210,7 @@ class PubSubApiSpec extends FlatSpec with BeforeAndAfterAll with ScalaFutures wi
)

val result = TestEmulatorHttpApi.pull(config.projectId, "sub1", None, true, 1000)
result.futureValue shouldBe PullResponse(Some(Seq(ReceivedMessage("ack1", publishMessage))))
result.futureValue shouldBe PullResponse(Some(Seq(ReceivedMessage("ack1", message))))

}

Expand Down Expand Up @@ -227,16 +261,14 @@ class PubSubApiSpec extends FlatSpec with BeforeAndAfterAll with ScalaFutures wi
it should "return exception with the meaningful error message in case of not successful publish response" in {
val publishMessage =
PubSubMessage(
messageId = "1",
data = new String(Base64.getEncoder.encode("Hello Google!".getBytes)),
attributes = Some(Map("row_id" -> "7")),
publishTime = Some(Instant.parse("2014-10-02T15:01:23.045123456Z"))
attributes = Map("row_id" -> "7")
)

val publishRequest = PublishRequest(Seq(publishMessage))

val expectedPublishRequest =
"""{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ==","messageId":"1","attributes":{"row_id":"7"},"publishTime":"2014-10-02T15:01:23.045123456Z"}]}"""
"""{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ==","attributes":{"row_id":"7"}}]}"""

mock.register(
WireMock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class ExampleUsage {

//#publish-single
val publishMessage =
PubSubMessage(messageId = "1", data = new String(Base64.getEncoder.encode("Hello Google!".getBytes)))
PubSubMessage(new String(Base64.getEncoder.encode("Hello Google!".getBytes)))
val publishRequest = PublishRequest(Seq(publishMessage))

val source: Source[PublishRequest, NotUsed] = Source.single(publishRequest)
Expand Down

0 comments on commit 977218e

Please sign in to comment.