Skip to content

Commit 854b39a

Browse files
authored
Add SSE style delimiting for message streaming (#109)
The response_to_json_translator currently supports streaming the messages as JSON array or as new line delimited JSON. This commit adds support for Server-Sent Events (SSE) message framing (data: <message>\n\n) for streamed messages.
1 parent c25637e commit 854b39a

File tree

3 files changed

+127
-7
lines changed

3 files changed

+127
-7
lines changed

src/include/grpc_transcoding/response_to_json_translator.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ struct JsonResponseTranslateOptions {
7070
// If set to false, all streaming messages are treated as a JSON array and
7171
// separated by comma.
7272
bool stream_newline_delimited;
73+
74+
// If true, enforces Server-Sent Events (SSE) message framing (`data: <message>\n\n`)
75+
// and, `stream_newline_delimited` is ignored.
76+
// If false, message framing is determined by `stream_newline_delimited`.
77+
bool stream_sse_style_delimited;
7378
};
7479

7580
class ResponseToJsonTranslator : public MessageStream {
@@ -84,7 +89,7 @@ class ResponseToJsonTranslator : public MessageStream {
8489
::google::protobuf::util::TypeResolver* type_resolver,
8590
std::string type_url, bool streaming, TranscoderInputStream* in,
8691
const JsonResponseTranslateOptions& options = {
87-
::google::protobuf::util::JsonPrintOptions(), false});
92+
::google::protobuf::util::JsonPrintOptions(), false, false});
8893

8994
// MessageStream implementation
9095
bool NextMessage(std::string* message);

src/response_to_json_translator.cc

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
//
1717
#include "grpc_transcoding/response_to_json_translator.h"
1818

19+
#include <algorithm>
20+
#include <cstring>
1921
#include <string>
2022

2123
#include "google/protobuf/io/zero_copy_stream_impl_lite.h"
@@ -65,10 +67,11 @@ bool ResponseToJsonTranslator::NextMessage(std::string* message) {
6567
return false;
6668
}
6769
} else if (streaming_ && reader_.Finished()) {
68-
if (!options_.stream_newline_delimited) {
69-
// This is a non-newline-delimited streaming call and the input is
70-
// finished. Return the final ']'
71-
// or "[]" in case this was an empty stream.
70+
if (!options_.stream_newline_delimited &&
71+
!options_.stream_sse_style_delimited) {
72+
// This is a non-newline-delimited and non-SSE-style-delimited streaming
73+
// call and the input is finished. Return the final ']' or "[]" in case
74+
// this was an empty stream.
7275
*message = first_ ? "[]" : "]";
7376
}
7477
finished_ = true;
@@ -95,14 +98,51 @@ bool WriteChar(::google::protobuf::io::ZeroCopyOutputStream* stream, char c) {
9598
return true;
9699
}
97100

101+
// A helper to write a string to a ZeroCopyOutputStream.
102+
bool WriteString(::google::protobuf::io::ZeroCopyOutputStream* stream,
103+
const std::string& str) {
104+
int bytes_to_write = str.size();
105+
int bytes_written = 0;
106+
while (bytes_written < bytes_to_write) {
107+
int size = 0;
108+
void* data;
109+
if (!stream->Next(&data, &size) || size == 0) {
110+
return false;
111+
}
112+
int bytes_to_write_this_iteration =
113+
std::min(bytes_to_write - bytes_written, size);
114+
memcpy(data, str.data() + bytes_written, bytes_to_write_this_iteration);
115+
bytes_written += bytes_to_write_this_iteration;
116+
if (bytes_to_write_this_iteration < size) {
117+
stream->BackUp(size - bytes_to_write_this_iteration);
118+
}
119+
}
120+
return true;
121+
}
122+
98123
} // namespace
99124

100125
bool ResponseToJsonTranslator::TranslateMessage(
101126
::google::protobuf::io::ZeroCopyInputStream* proto_in,
102127
std::string* json_out) {
103128
::google::protobuf::io::StringOutputStream json_stream(json_out);
104129

105-
if (streaming_ && !options_.stream_newline_delimited) {
130+
if (streaming_ && options_.stream_sse_style_delimited) {
131+
if (first_) {
132+
if (!WriteString(&json_stream, "data: ")) {
133+
status_ = absl::Status(absl::StatusCode::kInternal,
134+
"Failed to build the response message.");
135+
return false;
136+
}
137+
first_ = false;
138+
} else {
139+
if (!WriteString(&json_stream, "\n\ndata: ")) {
140+
status_ = absl::Status(absl::StatusCode::kInternal,
141+
"Failed to build the response message.");
142+
return false;
143+
}
144+
}
145+
} else if (streaming_ && !options_.stream_newline_delimited) {
106146
if (first_) {
107147
// This is a non-newline-delimited streaming call and this is the first
108148
// message, so prepend the
@@ -134,7 +174,7 @@ bool ResponseToJsonTranslator::TranslateMessage(
134174
}
135175

136176
// Append a newline delimiter after the message if needed.
137-
if (streaming_ && options_.stream_newline_delimited) {
177+
if (streaming_ && options_.stream_newline_delimited && !options_.stream_sse_style_delimited) {
138178
if (!WriteChar(&json_stream, '\n')) {
139179
status_ = absl::Status(absl::StatusCode::kInternal,
140180
"Failed to build the response message.");

test/response_to_json_translator_test.cc

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -911,6 +911,81 @@ TEST_F(ResponseToJsonTranslatorTest, StreamingNewlineDelimitedDirectTest) {
911911
EXPECT_FALSE(translator.NextMessage(&message));
912912
}
913913

914+
TEST_F(ResponseToJsonTranslatorTest, StreamingSSEStyleDelimitedDirectTest) {
915+
// Load the service config
916+
::google::api::Service service;
917+
ASSERT_TRUE(
918+
transcoding::testing::LoadService("bookstore_service.pb.txt", &service));
919+
920+
// Create a TypeHelper using the service config
921+
TypeHelper type_helper(service.types(), service.enums());
922+
923+
// Messages to test
924+
auto test_message1 =
925+
GenerateGrpcMessage<Shelf>(R"(name : "1" theme : "Fiction")");
926+
auto test_message2 =
927+
GenerateGrpcMessage<Shelf>(R"(name : "2" theme : "Fantasy")");
928+
auto test_message3 =
929+
GenerateGrpcMessage<Shelf>(R"(name : "3" theme : "Children")");
930+
auto test_message4 =
931+
GenerateGrpcMessage<Shelf>(R"(name : "4" theme : "Classics")");
932+
933+
TestZeroCopyInputStream input_stream;
934+
ResponseToJsonTranslator translator(
935+
type_helper.Resolver(), "type.googleapis.com/Shelf", true, &input_stream,
936+
{pbutil::JsonPrintOptions(), true, true});
937+
938+
std::string message;
939+
// There is nothing translated
940+
EXPECT_FALSE(translator.NextMessage(&message));
941+
942+
// Add test_message1 to the stream
943+
input_stream.AddChunk(test_message1);
944+
945+
// Now we should have the test_message1 translated
946+
EXPECT_TRUE(translator.NextMessage(&message));
947+
EXPECT_EQ("data: {\"name\":\"1\",\"theme\":\"Fiction\"}", message);
948+
949+
// No more messages, but not finished yet
950+
EXPECT_FALSE(translator.NextMessage(&message));
951+
EXPECT_FALSE(translator.Finished());
952+
953+
// Add the test_message2, test_message3 and part of test_message4
954+
input_stream.AddChunk(test_message2);
955+
input_stream.AddChunk(test_message3);
956+
input_stream.AddChunk(test_message4.substr(0, 10));
957+
958+
// Now we should have test_message2 & test_message3 translated
959+
EXPECT_TRUE(translator.NextMessage(&message));
960+
EXPECT_EQ("\n\ndata: {\"name\":\"2\",\"theme\":\"Fantasy\"}", message);
961+
962+
EXPECT_TRUE(translator.NextMessage(&message));
963+
EXPECT_EQ("\n\ndata: {\"name\":\"3\",\"theme\":\"Children\"}", message);
964+
965+
// No more messages, but not finished yet
966+
EXPECT_FALSE(translator.NextMessage(&message));
967+
EXPECT_FALSE(translator.Finished());
968+
969+
// Add the rest of test_message4
970+
input_stream.AddChunk(test_message4.substr(10));
971+
972+
// Now we should have the test_message4 translated
973+
EXPECT_TRUE(translator.NextMessage(&message));
974+
EXPECT_EQ("\n\ndata: {\"name\":\"4\",\"theme\":\"Classics\"}", message);
975+
976+
// No more messages, but not finished yet
977+
EXPECT_FALSE(translator.NextMessage(&message));
978+
EXPECT_FALSE(translator.Finished());
979+
980+
// Now finish the stream
981+
input_stream.Finish();
982+
983+
// All done!
984+
EXPECT_TRUE(translator.NextMessage(&message));
985+
EXPECT_TRUE(translator.Finished());
986+
EXPECT_FALSE(translator.NextMessage(&message));
987+
}
988+
914989
TEST_F(ResponseToJsonTranslatorTest, Streaming5KMessages) {
915990
// Load the service config
916991
::google::api::Service service;

0 commit comments

Comments
 (0)