diff --git a/runtime/handler.go b/runtime/handler.go index 38bd516a024..cdf5510d247 100644 --- a/runtime/handler.go +++ b/runtime/handler.go @@ -38,6 +38,11 @@ func ForwardResponseStream(ctx context.Context, mux *ServeMux, marshaler Marshal return } + var delimiter []byte + if d, ok := marshaler.(Delimited); ok { + delimiter = d.Delimiter() + } + var wroteHeader bool for { resp, err := recv() @@ -64,6 +69,10 @@ func ForwardResponseStream(ctx context.Context, mux *ServeMux, marshaler Marshal return } wroteHeader = true + if _, err = w.Write(delimiter); err != nil { + grpclog.Printf("Failed to send delimiter chunk: %v", err) + return + } f.Flush() } } diff --git a/runtime/handler_test.go b/runtime/handler_test.go index d63948cdd9f..344521c8a9c 100644 --- a/runtime/handler_test.go +++ b/runtime/handler_test.go @@ -93,6 +93,7 @@ func TestForwardResponseStream(t *testing.T) { t.Errorf("marshaler.Marshal() failed %v", err) } want = append(want, b...) + want = append(want, marshaler.Delimiter()...) } if string(body) != string(want) { diff --git a/runtime/marshal_json.go b/runtime/marshal_json.go index 0acd2ca29ef..b3a21418be4 100644 --- a/runtime/marshal_json.go +++ b/runtime/marshal_json.go @@ -35,3 +35,8 @@ func (j *JSONBuiltin) NewDecoder(r io.Reader) Decoder { func (j *JSONBuiltin) NewEncoder(w io.Writer) Encoder { return json.NewEncoder(w) } + +// Delimiter for newline encoded JSON streams. +func (j *JSONBuiltin) Delimiter() []byte { + return []byte("\n") +} diff --git a/runtime/marshal_jsonpb.go b/runtime/marshal_jsonpb.go index 49f13f7fc74..d42cc593e51 100644 --- a/runtime/marshal_jsonpb.go +++ b/runtime/marshal_jsonpb.go @@ -182,3 +182,8 @@ type protoEnum interface { } var typeProtoMessage = reflect.TypeOf((*proto.Message)(nil)).Elem() + +// Delimiter for newline encoded JSON streams. +func (j *JSONPb) Delimiter() []byte { + return []byte("\n") +} diff --git a/runtime/marshaler.go b/runtime/marshaler.go index 6d434f13cb4..98fe6e88ac5 100644 --- a/runtime/marshaler.go +++ b/runtime/marshaler.go @@ -40,3 +40,9 @@ type EncoderFunc func(v interface{}) error // Encode delegates invocations to the underlying function itself. func (f EncoderFunc) Encode(v interface{}) error { return f(v) } + +// Delimited defines the streaming delimiter. +type Delimited interface { + // Delimiter returns the record seperator for the stream. + Delimiter() []byte +}