diff --git a/README.md b/README.md index e4d1bcf3223..e812f65dd7d 100644 --- a/README.md +++ b/README.md @@ -154,11 +154,11 @@ More examples are available under `examples` directory. * Generating JSON API handlers * Method parameters in request body * Method parameters in request path +* Mppping streaming APIs to JSON streams ### Want to support But not yet. * Integrated authentication -* Streaming APIs * bytes and enum fields in path parameter * Method parameters in query string * Encoding request/response body in application/x-www-form-urlencoded diff --git a/examples/a_bit_of_everything.pb.go b/examples/a_bit_of_everything.pb.go index 64d30459b14..905560190ea 100644 --- a/examples/a_bit_of_everything.pb.go +++ b/examples/a_bit_of_everything.pb.go @@ -85,10 +85,13 @@ func init() { type ABitOfEverythingServiceClient interface { Create(ctx context.Context, in *ABitOfEverything, opts ...grpc.CallOption) (*ABitOfEverything, error) CreateBody(ctx context.Context, in *ABitOfEverything, opts ...grpc.CallOption) (*ABitOfEverything, error) + BulkCreate(ctx context.Context, opts ...grpc.CallOption) (ABitOfEverythingService_BulkCreateClient, error) Lookup(ctx context.Context, in *IdMessage, opts ...grpc.CallOption) (*ABitOfEverything, error) + List(ctx context.Context, in *EmptyMessage, opts ...grpc.CallOption) (ABitOfEverythingService_ListClient, error) Update(ctx context.Context, in *ABitOfEverything, opts ...grpc.CallOption) (*EmptyMessage, error) Delete(ctx context.Context, in *IdMessage, opts ...grpc.CallOption) (*EmptyMessage, error) Echo(ctx context.Context, in *gengo_grpc_gateway_examples_sub.StringMessage, opts ...grpc.CallOption) (*gengo_grpc_gateway_examples_sub.StringMessage, error) + BulkEcho(ctx context.Context, opts ...grpc.CallOption) (ABitOfEverythingService_BulkEchoClient, error) } type aBitOfEverythingServiceClient struct { @@ -117,6 +120,40 @@ func (c *aBitOfEverythingServiceClient) CreateBody(ctx context.Context, in *ABit return out, nil } +func (c *aBitOfEverythingServiceClient) BulkCreate(ctx context.Context, opts ...grpc.CallOption) (ABitOfEverythingService_BulkCreateClient, error) { + stream, err := grpc.NewClientStream(ctx, &_ABitOfEverythingService_serviceDesc.Streams[0], c.cc, "/gengo.grpc.gateway.examples.ABitOfEverythingService/BulkCreate", opts...) + if err != nil { + return nil, err + } + x := &aBitOfEverythingServiceBulkCreateClient{stream} + return x, nil +} + +type ABitOfEverythingService_BulkCreateClient interface { + Send(*ABitOfEverything) error + CloseAndRecv() (*EmptyMessage, error) + grpc.ClientStream +} + +type aBitOfEverythingServiceBulkCreateClient struct { + grpc.ClientStream +} + +func (x *aBitOfEverythingServiceBulkCreateClient) Send(m *ABitOfEverything) error { + return x.ClientStream.SendMsg(m) +} + +func (x *aBitOfEverythingServiceBulkCreateClient) CloseAndRecv() (*EmptyMessage, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(EmptyMessage) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + func (c *aBitOfEverythingServiceClient) Lookup(ctx context.Context, in *IdMessage, opts ...grpc.CallOption) (*ABitOfEverything, error) { out := new(ABitOfEverything) err := grpc.Invoke(ctx, "/gengo.grpc.gateway.examples.ABitOfEverythingService/Lookup", in, out, c.cc, opts...) @@ -126,6 +163,38 @@ func (c *aBitOfEverythingServiceClient) Lookup(ctx context.Context, in *IdMessag return out, nil } +func (c *aBitOfEverythingServiceClient) List(ctx context.Context, in *EmptyMessage, opts ...grpc.CallOption) (ABitOfEverythingService_ListClient, error) { + stream, err := grpc.NewClientStream(ctx, &_ABitOfEverythingService_serviceDesc.Streams[1], c.cc, "/gengo.grpc.gateway.examples.ABitOfEverythingService/List", opts...) + if err != nil { + return nil, err + } + x := &aBitOfEverythingServiceListClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type ABitOfEverythingService_ListClient interface { + Recv() (*ABitOfEverything, error) + grpc.ClientStream +} + +type aBitOfEverythingServiceListClient struct { + grpc.ClientStream +} + +func (x *aBitOfEverythingServiceListClient) Recv() (*ABitOfEverything, error) { + m := new(ABitOfEverything) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + func (c *aBitOfEverythingServiceClient) Update(ctx context.Context, in *ABitOfEverything, opts ...grpc.CallOption) (*EmptyMessage, error) { out := new(EmptyMessage) err := grpc.Invoke(ctx, "/gengo.grpc.gateway.examples.ABitOfEverythingService/Update", in, out, c.cc, opts...) @@ -153,15 +222,49 @@ func (c *aBitOfEverythingServiceClient) Echo(ctx context.Context, in *gengo_grpc return out, nil } +func (c *aBitOfEverythingServiceClient) BulkEcho(ctx context.Context, opts ...grpc.CallOption) (ABitOfEverythingService_BulkEchoClient, error) { + stream, err := grpc.NewClientStream(ctx, &_ABitOfEverythingService_serviceDesc.Streams[2], c.cc, "/gengo.grpc.gateway.examples.ABitOfEverythingService/BulkEcho", opts...) + if err != nil { + return nil, err + } + x := &aBitOfEverythingServiceBulkEchoClient{stream} + return x, nil +} + +type ABitOfEverythingService_BulkEchoClient interface { + Send(*gengo_grpc_gateway_examples_sub.StringMessage) error + Recv() (*gengo_grpc_gateway_examples_sub.StringMessage, error) + grpc.ClientStream +} + +type aBitOfEverythingServiceBulkEchoClient struct { + grpc.ClientStream +} + +func (x *aBitOfEverythingServiceBulkEchoClient) Send(m *gengo_grpc_gateway_examples_sub.StringMessage) error { + return x.ClientStream.SendMsg(m) +} + +func (x *aBitOfEverythingServiceBulkEchoClient) Recv() (*gengo_grpc_gateway_examples_sub.StringMessage, error) { + m := new(gengo_grpc_gateway_examples_sub.StringMessage) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // Server API for ABitOfEverythingService service type ABitOfEverythingServiceServer interface { Create(context.Context, *ABitOfEverything) (*ABitOfEverything, error) CreateBody(context.Context, *ABitOfEverything) (*ABitOfEverything, error) + BulkCreate(ABitOfEverythingService_BulkCreateServer) error Lookup(context.Context, *IdMessage) (*ABitOfEverything, error) + List(*EmptyMessage, ABitOfEverythingService_ListServer) error Update(context.Context, *ABitOfEverything) (*EmptyMessage, error) Delete(context.Context, *IdMessage) (*EmptyMessage, error) Echo(context.Context, *gengo_grpc_gateway_examples_sub.StringMessage) (*gengo_grpc_gateway_examples_sub.StringMessage, error) + BulkEcho(ABitOfEverythingService_BulkEchoServer) error } func RegisterABitOfEverythingServiceServer(s *grpc.Server, srv ABitOfEverythingServiceServer) { @@ -192,6 +295,32 @@ func _ABitOfEverythingService_CreateBody_Handler(srv interface{}, ctx context.Co return out, nil } +func _ABitOfEverythingService_BulkCreate_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(ABitOfEverythingServiceServer).BulkCreate(&aBitOfEverythingServiceBulkCreateServer{stream}) +} + +type ABitOfEverythingService_BulkCreateServer interface { + SendAndClose(*EmptyMessage) error + Recv() (*ABitOfEverything, error) + grpc.ServerStream +} + +type aBitOfEverythingServiceBulkCreateServer struct { + grpc.ServerStream +} + +func (x *aBitOfEverythingServiceBulkCreateServer) SendAndClose(m *EmptyMessage) error { + return x.ServerStream.SendMsg(m) +} + +func (x *aBitOfEverythingServiceBulkCreateServer) Recv() (*ABitOfEverything, error) { + m := new(ABitOfEverything) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + func _ABitOfEverythingService_Lookup_Handler(srv interface{}, ctx context.Context, buf []byte) (interface{}, error) { in := new(IdMessage) if err := proto.Unmarshal(buf, in); err != nil { @@ -204,6 +333,27 @@ func _ABitOfEverythingService_Lookup_Handler(srv interface{}, ctx context.Contex return out, nil } +func _ABitOfEverythingService_List_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(EmptyMessage) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(ABitOfEverythingServiceServer).List(m, &aBitOfEverythingServiceListServer{stream}) +} + +type ABitOfEverythingService_ListServer interface { + Send(*ABitOfEverything) error + grpc.ServerStream +} + +type aBitOfEverythingServiceListServer struct { + grpc.ServerStream +} + +func (x *aBitOfEverythingServiceListServer) Send(m *ABitOfEverything) error { + return x.ServerStream.SendMsg(m) +} + func _ABitOfEverythingService_Update_Handler(srv interface{}, ctx context.Context, buf []byte) (interface{}, error) { in := new(ABitOfEverything) if err := proto.Unmarshal(buf, in); err != nil { @@ -240,6 +390,32 @@ func _ABitOfEverythingService_Echo_Handler(srv interface{}, ctx context.Context, return out, nil } +func _ABitOfEverythingService_BulkEcho_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(ABitOfEverythingServiceServer).BulkEcho(&aBitOfEverythingServiceBulkEchoServer{stream}) +} + +type ABitOfEverythingService_BulkEchoServer interface { + Send(*gengo_grpc_gateway_examples_sub.StringMessage) error + Recv() (*gengo_grpc_gateway_examples_sub.StringMessage, error) + grpc.ServerStream +} + +type aBitOfEverythingServiceBulkEchoServer struct { + grpc.ServerStream +} + +func (x *aBitOfEverythingServiceBulkEchoServer) Send(m *gengo_grpc_gateway_examples_sub.StringMessage) error { + return x.ServerStream.SendMsg(m) +} + +func (x *aBitOfEverythingServiceBulkEchoServer) Recv() (*gengo_grpc_gateway_examples_sub.StringMessage, error) { + m := new(gengo_grpc_gateway_examples_sub.StringMessage) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + var _ABitOfEverythingService_serviceDesc = grpc.ServiceDesc{ ServiceName: "gengo.grpc.gateway.examples.ABitOfEverythingService", HandlerType: (*ABitOfEverythingServiceServer)(nil), @@ -269,5 +445,22 @@ var _ABitOfEverythingService_serviceDesc = grpc.ServiceDesc{ Handler: _ABitOfEverythingService_Echo_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "BulkCreate", + Handler: _ABitOfEverythingService_BulkCreate_Handler, + ClientStreams: true, + }, + { + StreamName: "List", + Handler: _ABitOfEverythingService_List_Handler, + ServerStreams: true, + }, + { + StreamName: "BulkEcho", + Handler: _ABitOfEverythingService_BulkEcho_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, } diff --git a/examples/a_bit_of_everything.pb.gw.go b/examples/a_bit_of_everything.pb.gw.go index dd280c6bb2f..a3a12810987 100644 --- a/examples/a_bit_of_everything.pb.gw.go +++ b/examples/a_bit_of_everything.pb.gw.go @@ -11,24 +11,26 @@ package main import ( "encoding/json" - "fmt" + "io" "net/http" - "github.com/gengo/grpc-gateway/convert" + "github.com/gengo/grpc-gateway/runtime" "github.com/golang/glog" "github.com/golang/protobuf/proto" "github.com/zenazn/goji/web" "golang.org/x/net/context" "google.golang.org/grpc" + "google.golang.org/grpc/codes" gengo_grpc_gateway_examples_sub "github.com/gengo/grpc-gateway/examples/sub" ) -var _ fmt.Stringer -var _ = convert.String +var _ codes.Code +var _ io.Reader +var _ = runtime.String -func handle_ABitOfEverythingService_Create(ctx context.Context, c web.C, client ABitOfEverythingServiceClient, req *http.Request) (msg proto.Message, err error) { - protoReq := new(ABitOfEverything) +func request_ABitOfEverythingService_Create(ctx context.Context, c web.C, client ABitOfEverythingServiceClient, req *http.Request) (msg proto.Message, err error) { + var protoReq ABitOfEverything if err = json.NewDecoder(req.Body).Decode(&protoReq); err != nil { return nil, err @@ -39,163 +41,196 @@ func handle_ABitOfEverythingService_Create(ctx context.Context, c web.C, client val, ok = c.URLParams["float_value"] if !ok { - return nil, fmt.Errorf("missing parameter %s", "float_value") + return nil, grpc.Errorf(codes.InvalidArgument, "missing parameter %s", "float_value") } - protoReq.FloatValue, err = convert.Float32(val) + protoReq.FloatValue, err = runtime.Float32(val) if err != nil { return nil, err } val, ok = c.URLParams["double_value"] if !ok { - return nil, fmt.Errorf("missing parameter %s", "double_value") + return nil, grpc.Errorf(codes.InvalidArgument, "missing parameter %s", "double_value") } - protoReq.DoubleValue, err = convert.Float64(val) + protoReq.DoubleValue, err = runtime.Float64(val) if err != nil { return nil, err } val, ok = c.URLParams["int64_value"] if !ok { - return nil, fmt.Errorf("missing parameter %s", "int64_value") + return nil, grpc.Errorf(codes.InvalidArgument, "missing parameter %s", "int64_value") } - protoReq.Int64Value, err = convert.Int64(val) + protoReq.Int64Value, err = runtime.Int64(val) if err != nil { return nil, err } val, ok = c.URLParams["uint64_value"] if !ok { - return nil, fmt.Errorf("missing parameter %s", "uint64_value") + return nil, grpc.Errorf(codes.InvalidArgument, "missing parameter %s", "uint64_value") } - protoReq.Uint64Value, err = convert.Uint64(val) + protoReq.Uint64Value, err = runtime.Uint64(val) if err != nil { return nil, err } val, ok = c.URLParams["int32_value"] if !ok { - return nil, fmt.Errorf("missing parameter %s", "int32_value") + return nil, grpc.Errorf(codes.InvalidArgument, "missing parameter %s", "int32_value") } - protoReq.Int32Value, err = convert.Int32(val) + protoReq.Int32Value, err = runtime.Int32(val) if err != nil { return nil, err } val, ok = c.URLParams["fixed64_value"] if !ok { - return nil, fmt.Errorf("missing parameter %s", "fixed64_value") + return nil, grpc.Errorf(codes.InvalidArgument, "missing parameter %s", "fixed64_value") } - protoReq.Fixed64Value, err = convert.Uint64(val) + protoReq.Fixed64Value, err = runtime.Uint64(val) if err != nil { return nil, err } val, ok = c.URLParams["fixed32_value"] if !ok { - return nil, fmt.Errorf("missing parameter %s", "fixed32_value") + return nil, grpc.Errorf(codes.InvalidArgument, "missing parameter %s", "fixed32_value") } - protoReq.Fixed32Value, err = convert.Uint32(val) + protoReq.Fixed32Value, err = runtime.Uint32(val) if err != nil { return nil, err } val, ok = c.URLParams["bool_value"] if !ok { - return nil, fmt.Errorf("missing parameter %s", "bool_value") + return nil, grpc.Errorf(codes.InvalidArgument, "missing parameter %s", "bool_value") } - protoReq.BoolValue, err = convert.Bool(val) + protoReq.BoolValue, err = runtime.Bool(val) if err != nil { return nil, err } val, ok = c.URLParams["string_value"] if !ok { - return nil, fmt.Errorf("missing parameter %s", "string_value") + return nil, grpc.Errorf(codes.InvalidArgument, "missing parameter %s", "string_value") } - protoReq.StringValue, err = convert.String(val) + protoReq.StringValue, err = runtime.String(val) if err != nil { return nil, err } val, ok = c.URLParams["uint32_value"] if !ok { - return nil, fmt.Errorf("missing parameter %s", "uint32_value") + return nil, grpc.Errorf(codes.InvalidArgument, "missing parameter %s", "uint32_value") } - protoReq.Uint32Value, err = convert.Uint32(val) + protoReq.Uint32Value, err = runtime.Uint32(val) if err != nil { return nil, err } val, ok = c.URLParams["sfixed32_value"] if !ok { - return nil, fmt.Errorf("missing parameter %s", "sfixed32_value") + return nil, grpc.Errorf(codes.InvalidArgument, "missing parameter %s", "sfixed32_value") } - protoReq.Sfixed32Value, err = convert.Int32(val) + protoReq.Sfixed32Value, err = runtime.Int32(val) if err != nil { return nil, err } val, ok = c.URLParams["sfixed64_value"] if !ok { - return nil, fmt.Errorf("missing parameter %s", "sfixed64_value") + return nil, grpc.Errorf(codes.InvalidArgument, "missing parameter %s", "sfixed64_value") } - protoReq.Sfixed64Value, err = convert.Int64(val) + protoReq.Sfixed64Value, err = runtime.Int64(val) if err != nil { return nil, err } val, ok = c.URLParams["sint32_value"] if !ok { - return nil, fmt.Errorf("missing parameter %s", "sint32_value") + return nil, grpc.Errorf(codes.InvalidArgument, "missing parameter %s", "sint32_value") } - protoReq.Sint32Value, err = convert.Int32(val) + protoReq.Sint32Value, err = runtime.Int32(val) if err != nil { return nil, err } val, ok = c.URLParams["sint64_value"] if !ok { - return nil, fmt.Errorf("missing parameter %s", "sint64_value") + return nil, grpc.Errorf(codes.InvalidArgument, "missing parameter %s", "sint64_value") } - protoReq.Sint64Value, err = convert.Int64(val) + protoReq.Sint64Value, err = runtime.Int64(val) if err != nil { return nil, err } - return client.Create(ctx, protoReq) + return client.Create(ctx, &protoReq) } -func handle_ABitOfEverythingService_CreateBody(ctx context.Context, c web.C, client ABitOfEverythingServiceClient, req *http.Request) (msg proto.Message, err error) { - protoReq := new(ABitOfEverything) +func request_ABitOfEverythingService_CreateBody(ctx context.Context, c web.C, client ABitOfEverythingServiceClient, req *http.Request) (msg proto.Message, err error) { + var protoReq ABitOfEverything if err = json.NewDecoder(req.Body).Decode(&protoReq); err != nil { return nil, err } - return client.CreateBody(ctx, protoReq) + return client.CreateBody(ctx, &protoReq) } -func handle_ABitOfEverythingService_Lookup(ctx context.Context, c web.C, client ABitOfEverythingServiceClient, req *http.Request) (msg proto.Message, err error) { - protoReq := new(IdMessage) +func request_ABitOfEverythingService_BulkCreate(ctx context.Context, c web.C, client ABitOfEverythingServiceClient, req *http.Request) (msg proto.Message, err error) { + stream, err := client.BulkCreate(ctx) + if err != nil { + glog.Errorf("Failed to start streaming: %v", err) + return nil, err + } + dec := json.NewDecoder(req.Body) + for { + var protoReq ABitOfEverything + err = dec.Decode(&protoReq) + if err == io.EOF { + break + } + if err != nil { + glog.Errorf("Failed to decode request: %v", err) + return nil, grpc.Errorf(codes.InvalidArgument, "%v", err) + } + if err = stream.Send(&protoReq); err != nil { + glog.Errorf("Failed to send request: %v", err) + return nil, err + } + } + + return stream.CloseAndRecv() + +} + +func request_ABitOfEverythingService_Lookup(ctx context.Context, c web.C, client ABitOfEverythingServiceClient, req *http.Request) (msg proto.Message, err error) { + var protoReq IdMessage var val string var ok bool val, ok = c.URLParams["uuid"] if !ok { - return nil, fmt.Errorf("missing parameter %s", "uuid") + return nil, grpc.Errorf(codes.InvalidArgument, "missing parameter %s", "uuid") } - protoReq.Uuid, err = convert.String(val) + protoReq.Uuid, err = runtime.String(val) if err != nil { return nil, err } - return client.Lookup(ctx, protoReq) + return client.Lookup(ctx, &protoReq) +} + +func request_ABitOfEverythingService_List(ctx context.Context, c web.C, client ABitOfEverythingServiceClient, req *http.Request) (ABitOfEverythingService_ListClient, error) { + var protoReq EmptyMessage + + return client.List(ctx, &protoReq) } -func handle_ABitOfEverythingService_Update(ctx context.Context, c web.C, client ABitOfEverythingServiceClient, req *http.Request) (msg proto.Message, err error) { - protoReq := new(ABitOfEverything) +func request_ABitOfEverythingService_Update(ctx context.Context, c web.C, client ABitOfEverythingServiceClient, req *http.Request) (msg proto.Message, err error) { + var protoReq ABitOfEverything if err = json.NewDecoder(req.Body).Decode(&protoReq); err != nil { return nil, err @@ -206,52 +241,85 @@ func handle_ABitOfEverythingService_Update(ctx context.Context, c web.C, client val, ok = c.URLParams["uuid"] if !ok { - return nil, fmt.Errorf("missing parameter %s", "uuid") + return nil, grpc.Errorf(codes.InvalidArgument, "missing parameter %s", "uuid") } - protoReq.Uuid, err = convert.String(val) + protoReq.Uuid, err = runtime.String(val) if err != nil { return nil, err } - return client.Update(ctx, protoReq) + return client.Update(ctx, &protoReq) } -func handle_ABitOfEverythingService_Delete(ctx context.Context, c web.C, client ABitOfEverythingServiceClient, req *http.Request) (msg proto.Message, err error) { - protoReq := new(IdMessage) +func request_ABitOfEverythingService_Delete(ctx context.Context, c web.C, client ABitOfEverythingServiceClient, req *http.Request) (msg proto.Message, err error) { + var protoReq IdMessage var val string var ok bool val, ok = c.URLParams["uuid"] if !ok { - return nil, fmt.Errorf("missing parameter %s", "uuid") + return nil, grpc.Errorf(codes.InvalidArgument, "missing parameter %s", "uuid") } - protoReq.Uuid, err = convert.String(val) + protoReq.Uuid, err = runtime.String(val) if err != nil { return nil, err } - return client.Delete(ctx, protoReq) + return client.Delete(ctx, &protoReq) } -func handle_ABitOfEverythingService_Echo(ctx context.Context, c web.C, client ABitOfEverythingServiceClient, req *http.Request) (msg proto.Message, err error) { - protoReq := new(gengo_grpc_gateway_examples_sub.StringMessage) +func request_ABitOfEverythingService_Echo(ctx context.Context, c web.C, client ABitOfEverythingServiceClient, req *http.Request) (msg proto.Message, err error) { + var protoReq gengo_grpc_gateway_examples_sub.StringMessage var val string var ok bool val, ok = c.URLParams["value"] if !ok { - return nil, fmt.Errorf("missing parameter %s", "value") + return nil, grpc.Errorf(codes.InvalidArgument, "missing parameter %s", "value") + } + protoReq.Value, err = runtime.StringP(val) + if err != nil { + return nil, err } - protoReq.Value, err = convert.StringP(val) + + return client.Echo(ctx, &protoReq) +} + +func request_ABitOfEverythingService_BulkEcho(ctx context.Context, c web.C, client ABitOfEverythingServiceClient, req *http.Request) (ABitOfEverythingService_BulkEchoClient, error) { + stream, err := client.BulkEcho(ctx) if err != nil { + glog.Errorf("Failed to start streaming: %v", err) return nil, err } + dec := json.NewDecoder(req.Body) + for { + var protoReq gengo_grpc_gateway_examples_sub.StringMessage + err = dec.Decode(&protoReq) + if err == io.EOF { + break + } + if err != nil { + glog.Errorf("Failed to decode request: %v", err) + return nil, grpc.Errorf(codes.InvalidArgument, "%v", err) + } + if err = stream.Send(&protoReq); err != nil { + glog.Errorf("Failed to send request: %v", err) + return nil, err + } + } + + if err = stream.CloseSend(); err != nil { + glog.Errorf("Failed to terminate client stream: %v", err) + return nil, err + } + return stream, nil - return client.Echo(ctx, protoReq) } +// RegisterABitOfEverythingServiceHandlerFromEndpoint is same as RegisterABitOfEverythingServiceHandler but +// automatically dials to "endpoint" and closes the connection when "ctx" gets done. func RegisterABitOfEverythingServiceHandlerFromEndpoint(ctx context.Context, mux *web.Mux, endpoint string) (err error) { conn, err := grpc.Dial(endpoint) if err != nil { @@ -275,121 +343,108 @@ func RegisterABitOfEverythingServiceHandlerFromEndpoint(ctx context.Context, mux return RegisterABitOfEverythingServiceHandler(ctx, mux, conn) } +// RegisterABitOfEverythingServiceHandler registers the http handlers for service ABitOfEverythingService to "mux". +// The handlers forward requests to the grpc endpoint over "conn". func RegisterABitOfEverythingServiceHandler(ctx context.Context, mux *web.Mux, conn *grpc.ClientConn) error { client := NewABitOfEverythingServiceClient(conn) mux.Post("/v1/example/a_bit_of_everything/:float_value/:double_value/:int64_value/separator/:uint64_value/:int32_value/:fixed64_value/:fixed32_value/:bool_value/:string_value/:uint32_value/:sfixed32_value/:sfixed64_value/:sint32_value/:sint64_value", func(c web.C, w http.ResponseWriter, req *http.Request) { - resp, err := handle_ABitOfEverythingService_Create(ctx, c, client, req) - if err != nil { - glog.Errorf("RPC error: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - buf, err := json.Marshal(resp) + resp, err := request_ABitOfEverythingService_Create(ctx, c, client, req) if err != nil { - glog.Errorf("Marshal error: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) + runtime.HTTPError(w, err) return } - w.Header().Set("Content-Type", "application/json") - if _, err = w.Write(buf); err != nil { - glog.Errorf("Failed to write response: %v", err) - } + + runtime.ForwardResponseMessage(w, resp) + }) mux.Post("/v1/example/a_bit_of_everything", func(c web.C, w http.ResponseWriter, req *http.Request) { - resp, err := handle_ABitOfEverythingService_CreateBody(ctx, c, client, req) + resp, err := request_ABitOfEverythingService_CreateBody(ctx, c, client, req) if err != nil { - glog.Errorf("RPC error: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) + runtime.HTTPError(w, err) return } - buf, err := json.Marshal(resp) + + runtime.ForwardResponseMessage(w, resp) + + }) + + mux.Post("/v1/example/a_bit_of_everything/bulk", func(c web.C, w http.ResponseWriter, req *http.Request) { + resp, err := request_ABitOfEverythingService_BulkCreate(ctx, c, client, req) if err != nil { - glog.Errorf("Marshal error: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) + runtime.HTTPError(w, err) return } - w.Header().Set("Content-Type", "application/json") - if _, err = w.Write(buf); err != nil { - glog.Errorf("Failed to write response: %v", err) - } + + runtime.ForwardResponseMessage(w, resp) + }) mux.Get("/v1/example/a_bit_of_everything/:uuid", func(c web.C, w http.ResponseWriter, req *http.Request) { - resp, err := handle_ABitOfEverythingService_Lookup(ctx, c, client, req) + resp, err := request_ABitOfEverythingService_Lookup(ctx, c, client, req) if err != nil { - glog.Errorf("RPC error: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) + runtime.HTTPError(w, err) return } - buf, err := json.Marshal(resp) + + runtime.ForwardResponseMessage(w, resp) + + }) + + mux.Get("/v1/example/a_bit_of_everything", func(c web.C, w http.ResponseWriter, req *http.Request) { + resp, err := request_ABitOfEverythingService_List(ctx, c, client, req) if err != nil { - glog.Errorf("Marshal error: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) + runtime.HTTPError(w, err) return } - w.Header().Set("Content-Type", "application/json") - if _, err = w.Write(buf); err != nil { - glog.Errorf("Failed to write response: %v", err) - } + + runtime.ForwardResponseStream(w, func() (proto.Message, error) { return resp.Recv() }) + }) mux.Put("/v1/example/a_bit_of_everything/:uuid", func(c web.C, w http.ResponseWriter, req *http.Request) { - resp, err := handle_ABitOfEverythingService_Update(ctx, c, client, req) - if err != nil { - glog.Errorf("RPC error: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - buf, err := json.Marshal(resp) + resp, err := request_ABitOfEverythingService_Update(ctx, c, client, req) if err != nil { - glog.Errorf("Marshal error: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) + runtime.HTTPError(w, err) return } - w.Header().Set("Content-Type", "application/json") - if _, err = w.Write(buf); err != nil { - glog.Errorf("Failed to write response: %v", err) - } + + runtime.ForwardResponseMessage(w, resp) + }) mux.Delete("/v1/example/a_bit_of_everything/:uuid", func(c web.C, w http.ResponseWriter, req *http.Request) { - resp, err := handle_ABitOfEverythingService_Delete(ctx, c, client, req) + resp, err := request_ABitOfEverythingService_Delete(ctx, c, client, req) if err != nil { - glog.Errorf("RPC error: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) + runtime.HTTPError(w, err) return } - buf, err := json.Marshal(resp) - if err != nil { - glog.Errorf("Marshal error: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - w.Header().Set("Content-Type", "application/json") - if _, err = w.Write(buf); err != nil { - glog.Errorf("Failed to write response: %v", err) - } + + runtime.ForwardResponseMessage(w, resp) + }) mux.Get("/v1/example/a_bit_of_everything/echo/:value", func(c web.C, w http.ResponseWriter, req *http.Request) { - resp, err := handle_ABitOfEverythingService_Echo(ctx, c, client, req) + resp, err := request_ABitOfEverythingService_Echo(ctx, c, client, req) if err != nil { - glog.Errorf("RPC error: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) + runtime.HTTPError(w, err) return } - buf, err := json.Marshal(resp) + + runtime.ForwardResponseMessage(w, resp) + + }) + + mux.Post("/v1/example/a_bit_of_everything/echo", func(c web.C, w http.ResponseWriter, req *http.Request) { + resp, err := request_ABitOfEverythingService_BulkEcho(ctx, c, client, req) if err != nil { - glog.Errorf("Marshal error: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) + runtime.HTTPError(w, err) return } - w.Header().Set("Content-Type", "application/json") - if _, err = w.Write(buf); err != nil { - glog.Errorf("Failed to write response: %v", err) - } + + runtime.ForwardResponseStream(w, func() (proto.Message, error) { return resp.Recv() }) + }) return nil diff --git a/examples/a_bit_of_everything.proto b/examples/a_bit_of_everything.proto index 4447b3406c8..0333fc181b3 100644 --- a/examples/a_bit_of_everything.proto +++ b/examples/a_bit_of_everything.proto @@ -51,10 +51,22 @@ service ABitOfEverythingService { method: "POST" }; } + rpc BulkCreate(stream ABitOfEverything) returns (EmptyMessage) { + option (gengo.grpc.gateway.ApiMethodOptions.api_options) = { + path: "/v1/example/a_bit_of_everything/bulk" + method: "POST" + }; + } rpc Lookup(IdMessage) returns (ABitOfEverything) { option (gengo.grpc.gateway.ApiMethodOptions.api_options) = { path: "/v1/example/a_bit_of_everything/:uuid" - method: "Get" + method: "GET" + }; + } + rpc List(EmptyMessage) returns (stream ABitOfEverything) { + option (gengo.grpc.gateway.ApiMethodOptions.api_options) = { + path: "/v1/example/a_bit_of_everything" + method: "GET" }; } rpc Update(ABitOfEverything) returns (EmptyMessage) { @@ -75,4 +87,10 @@ service ABitOfEverythingService { method: "GET" }; } + rpc BulkEcho(stream gengo.grpc.gateway.examples.sub.StringMessage) returns (stream gengo.grpc.gateway.examples.sub.StringMessage) { + option (gengo.grpc.gateway.ApiMethodOptions.api_options) = { + path: "/v1/example/a_bit_of_everything/echo" + method: "POST" + }; + } } diff --git a/examples/echo_service.pb.gw.go b/examples/echo_service.pb.gw.go index ec3b2766809..510044bcf5b 100644 --- a/examples/echo_service.pb.gw.go +++ b/examples/echo_service.pb.gw.go @@ -11,48 +11,52 @@ package main import ( "encoding/json" - "fmt" + "io" "net/http" - "github.com/gengo/grpc-gateway/convert" + "github.com/gengo/grpc-gateway/runtime" "github.com/golang/glog" "github.com/golang/protobuf/proto" "github.com/zenazn/goji/web" "golang.org/x/net/context" "google.golang.org/grpc" + "google.golang.org/grpc/codes" ) -var _ fmt.Stringer -var _ = convert.String +var _ codes.Code +var _ io.Reader +var _ = runtime.String -func handle_EchoService_Echo(ctx context.Context, c web.C, client EchoServiceClient, req *http.Request) (msg proto.Message, err error) { - protoReq := new(SimpleMessage) +func request_EchoService_Echo(ctx context.Context, c web.C, client EchoServiceClient, req *http.Request) (msg proto.Message, err error) { + var protoReq SimpleMessage var val string var ok bool val, ok = c.URLParams["id"] if !ok { - return nil, fmt.Errorf("missing parameter %s", "id") + return nil, grpc.Errorf(codes.InvalidArgument, "missing parameter %s", "id") } - protoReq.Id, err = convert.String(val) + protoReq.Id, err = runtime.String(val) if err != nil { return nil, err } - return client.Echo(ctx, protoReq) + return client.Echo(ctx, &protoReq) } -func handle_EchoService_EchoBody(ctx context.Context, c web.C, client EchoServiceClient, req *http.Request) (msg proto.Message, err error) { - protoReq := new(SimpleMessage) +func request_EchoService_EchoBody(ctx context.Context, c web.C, client EchoServiceClient, req *http.Request) (msg proto.Message, err error) { + var protoReq SimpleMessage if err = json.NewDecoder(req.Body).Decode(&protoReq); err != nil { return nil, err } - return client.EchoBody(ctx, protoReq) + return client.EchoBody(ctx, &protoReq) } +// RegisterEchoServiceHandlerFromEndpoint is same as RegisterEchoServiceHandler but +// automatically dials to "endpoint" and closes the connection when "ctx" gets done. func RegisterEchoServiceHandlerFromEndpoint(ctx context.Context, mux *web.Mux, endpoint string) (err error) { conn, err := grpc.Dial(endpoint) if err != nil { @@ -76,45 +80,31 @@ func RegisterEchoServiceHandlerFromEndpoint(ctx context.Context, mux *web.Mux, e return RegisterEchoServiceHandler(ctx, mux, conn) } +// RegisterEchoServiceHandler registers the http handlers for service EchoService to "mux". +// The handlers forward requests to the grpc endpoint over "conn". func RegisterEchoServiceHandler(ctx context.Context, mux *web.Mux, conn *grpc.ClientConn) error { client := NewEchoServiceClient(conn) mux.Post("/v1/example/echo/:id", func(c web.C, w http.ResponseWriter, req *http.Request) { - resp, err := handle_EchoService_Echo(ctx, c, client, req) + resp, err := request_EchoService_Echo(ctx, c, client, req) if err != nil { - glog.Errorf("RPC error: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) + runtime.HTTPError(w, err) return } - buf, err := json.Marshal(resp) - if err != nil { - glog.Errorf("Marshal error: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - w.Header().Set("Content-Type", "application/json") - if _, err = w.Write(buf); err != nil { - glog.Errorf("Failed to write response: %v", err) - } + + runtime.ForwardResponseMessage(w, resp) + }) mux.Post("/v1/example/echo_body", func(c web.C, w http.ResponseWriter, req *http.Request) { - resp, err := handle_EchoService_EchoBody(ctx, c, client, req) + resp, err := request_EchoService_EchoBody(ctx, c, client, req) if err != nil { - glog.Errorf("RPC error: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) + runtime.HTTPError(w, err) return } - buf, err := json.Marshal(resp) - if err != nil { - glog.Errorf("Marshal error: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - w.Header().Set("Content-Type", "application/json") - if _, err = w.Write(buf); err != nil { - glog.Errorf("Failed to write response: %v", err) - } + + runtime.ForwardResponseMessage(w, resp) + }) return nil diff --git a/examples/server/main.go b/examples/server/main.go index cebe63df5ba..2eaec00a73c 100644 --- a/examples/server/main.go +++ b/examples/server/main.go @@ -3,6 +3,7 @@ package main import ( "flag" "fmt" + "io" "net" "sync" @@ -69,6 +70,24 @@ func (s *_ABitOfEverythingServer) CreateBody(ctx context.Context, msg *examples. return s.Create(ctx, msg) } +func (s *_ABitOfEverythingServer) BulkCreate(stream examples.ABitOfEverythingService_BulkCreateServer) error { + ctx := stream.Context() + for { + msg, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + return err + } + glog.Error(msg) + if _, err = s.Create(ctx, msg); err != nil { + return err + } + } + return stream.SendAndClose(new(examples.EmptyMessage)) +} + func (s *_ABitOfEverythingServer) Lookup(ctx context.Context, msg *examples.IdMessage) (*examples.ABitOfEverything, error) { s.m.Lock() defer s.m.Unlock() @@ -80,6 +99,17 @@ func (s *_ABitOfEverythingServer) Lookup(ctx context.Context, msg *examples.IdMe return nil, grpc.Errorf(codes.NotFound, "not found") } +func (s *_ABitOfEverythingServer) List(_ *examples.EmptyMessage, stream examples.ABitOfEverythingService_ListServer) error { + s.m.Lock() + defer s.m.Unlock() + for _, msg := range s.v { + if err := stream.Send(msg); err != nil { + return err + } + } + return nil +} + func (s *_ABitOfEverythingServer) Update(ctx context.Context, msg *examples.ABitOfEverything) (*examples.EmptyMessage, error) { s.m.Lock() defer s.m.Unlock() @@ -114,6 +144,27 @@ func (s *_ABitOfEverythingServer) Echo(ctx context.Context, msg *sub.StringMessa return msg, nil } +func (s *_ABitOfEverythingServer) BulkEcho(stream examples.ABitOfEverythingService_BulkEchoServer) error { + var msgs []*sub.StringMessage + for { + msg, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + return err + } + msgs = append(msgs, msg) + } + for _, msg := range msgs { + glog.Info(msg) + if err := stream.Send(msg); err != nil { + return err + } + } + return nil +} + func run() error { ctx := context.Background() ctx, cancel := context.WithCancel(ctx) diff --git a/protoc-gen-grpc-gateway/generator.go b/protoc-gen-grpc-gateway/generator.go index 62e0ab7eb12..f5fde3ac8c7 100644 --- a/protoc-gen-grpc-gateway/generator.go +++ b/protoc-gen-grpc-gateway/generator.go @@ -156,48 +156,48 @@ func getAPIOptions(meth *descriptor.MethodDescriptorProto) (*options.ApiMethodOp var ( proto3ConvertFuncs = map[descriptor.FieldDescriptorProto_Type]string{ - descriptor.FieldDescriptorProto_TYPE_DOUBLE: "convert.Float64", - descriptor.FieldDescriptorProto_TYPE_FLOAT: "convert.Float32", - descriptor.FieldDescriptorProto_TYPE_INT64: "convert.Int64", - descriptor.FieldDescriptorProto_TYPE_UINT64: "convert.Uint64", - descriptor.FieldDescriptorProto_TYPE_INT32: "convert.Int32", - descriptor.FieldDescriptorProto_TYPE_FIXED64: "convert.Uint64", - descriptor.FieldDescriptorProto_TYPE_FIXED32: "convert.Uint32", - descriptor.FieldDescriptorProto_TYPE_BOOL: "convert.Bool", - descriptor.FieldDescriptorProto_TYPE_STRING: "convert.String", + descriptor.FieldDescriptorProto_TYPE_DOUBLE: "runtime.Float64", + descriptor.FieldDescriptorProto_TYPE_FLOAT: "runtime.Float32", + descriptor.FieldDescriptorProto_TYPE_INT64: "runtime.Int64", + descriptor.FieldDescriptorProto_TYPE_UINT64: "runtime.Uint64", + descriptor.FieldDescriptorProto_TYPE_INT32: "runtime.Int32", + descriptor.FieldDescriptorProto_TYPE_FIXED64: "runtime.Uint64", + descriptor.FieldDescriptorProto_TYPE_FIXED32: "runtime.Uint32", + descriptor.FieldDescriptorProto_TYPE_BOOL: "runtime.Bool", + descriptor.FieldDescriptorProto_TYPE_STRING: "runtime.String", // FieldDescriptorProto_TYPE_GROUP // FieldDescriptorProto_TYPE_MESSAGE // FieldDescriptorProto_TYPE_BYTES // TODO(yugui) Handle bytes - descriptor.FieldDescriptorProto_TYPE_UINT32: "convert.Uint32", + descriptor.FieldDescriptorProto_TYPE_UINT32: "runtime.Uint32", // FieldDescriptorProto_TYPE_ENUM // TODO(yugui) Handle Enum - descriptor.FieldDescriptorProto_TYPE_SFIXED32: "convert.Int32", - descriptor.FieldDescriptorProto_TYPE_SFIXED64: "convert.Int64", - descriptor.FieldDescriptorProto_TYPE_SINT32: "convert.Int32", - descriptor.FieldDescriptorProto_TYPE_SINT64: "convert.Int64", + descriptor.FieldDescriptorProto_TYPE_SFIXED32: "runtime.Int32", + descriptor.FieldDescriptorProto_TYPE_SFIXED64: "runtime.Int64", + descriptor.FieldDescriptorProto_TYPE_SINT32: "runtime.Int32", + descriptor.FieldDescriptorProto_TYPE_SINT64: "runtime.Int64", } proto2ConvertFuncs = map[descriptor.FieldDescriptorProto_Type]string{ - descriptor.FieldDescriptorProto_TYPE_DOUBLE: "convert.Float64P", - descriptor.FieldDescriptorProto_TYPE_FLOAT: "convert.Float32P", - descriptor.FieldDescriptorProto_TYPE_INT64: "convert.Int64P", - descriptor.FieldDescriptorProto_TYPE_UINT64: "convert.Uint64P", - descriptor.FieldDescriptorProto_TYPE_INT32: "convert.Int32P", - descriptor.FieldDescriptorProto_TYPE_FIXED64: "convert.Uint64P", - descriptor.FieldDescriptorProto_TYPE_FIXED32: "convert.Uint32P", - descriptor.FieldDescriptorProto_TYPE_BOOL: "convert.BoolP", - descriptor.FieldDescriptorProto_TYPE_STRING: "convert.StringP", + descriptor.FieldDescriptorProto_TYPE_DOUBLE: "runtime.Float64P", + descriptor.FieldDescriptorProto_TYPE_FLOAT: "runtime.Float32P", + descriptor.FieldDescriptorProto_TYPE_INT64: "runtime.Int64P", + descriptor.FieldDescriptorProto_TYPE_UINT64: "runtime.Uint64P", + descriptor.FieldDescriptorProto_TYPE_INT32: "runtime.Int32P", + descriptor.FieldDescriptorProto_TYPE_FIXED64: "runtime.Uint64P", + descriptor.FieldDescriptorProto_TYPE_FIXED32: "runtime.Uint32P", + descriptor.FieldDescriptorProto_TYPE_BOOL: "runtime.BoolP", + descriptor.FieldDescriptorProto_TYPE_STRING: "runtime.StringP", // FieldDescriptorProto_TYPE_GROUP // FieldDescriptorProto_TYPE_MESSAGE // FieldDescriptorProto_TYPE_BYTES // TODO(yugui) Handle bytes - descriptor.FieldDescriptorProto_TYPE_UINT32: "convert.Uint32P", + descriptor.FieldDescriptorProto_TYPE_UINT32: "runtime.Uint32P", // FieldDescriptorProto_TYPE_ENUM // TODO(yugui) Handle Enum - descriptor.FieldDescriptorProto_TYPE_SFIXED32: "convert.Int32P", - descriptor.FieldDescriptorProto_TYPE_SFIXED64: "convert.Int64P", - descriptor.FieldDescriptorProto_TYPE_SINT32: "convert.Int32P", - descriptor.FieldDescriptorProto_TYPE_SINT64: "convert.Int64P", + descriptor.FieldDescriptorProto_TYPE_SFIXED32: "runtime.Int32P", + descriptor.FieldDescriptorProto_TYPE_SFIXED64: "runtime.Int64P", + descriptor.FieldDescriptorProto_TYPE_SINT32: "runtime.Int32P", + descriptor.FieldDescriptorProto_TYPE_SINT64: "runtime.Int64P", } ) @@ -270,20 +270,25 @@ func generateSingleFile(file *descriptor.FileDescriptorProto) (string, error) { } needsBody := len(fields) != 0 if needsBody && (opts.GetMethod() == "GET" || opts.GetMethod() == "DELETE") { - return "", fmt.Errorf("needs request body even though http method is %s: %s", opts.Method, meth.GetName()) + return "", fmt.Errorf("needs request body even though http method is %s: %s", opts.GetMethod(), meth.GetName()) + } + if meth.GetClientStreaming() && (len(params) > 0 || !needsBody) { + return "", fmt.Errorf("cannot use path parameter in client streaming") } requestGoType := goTypeFromProtoType(meth.GetInputType()[1:], file.GetPackage()) if idx := strings.Index(requestGoType, "."); idx >= 0 { usedImports[requestGoType[:idx]] = true } md := methodDesc{ - ServiceName: svc.GetName(), - Name: meth.GetName(), - Method: opts.GetMethod(), - Path: opts.GetPath(), - RequestType: requestGoType, - PathParams: params, - NeedsBody: needsBody, + ServiceName: svc.GetName(), + Name: meth.GetName(), + Method: opts.GetMethod(), + Path: opts.GetPath(), + RequestType: requestGoType, + PathParams: params, + NeedsBody: needsBody, + ServerStreaming: meth.GetServerStreaming(), + ClientStreaming: meth.GetClientStreaming(), } sd.Methods = append(sd.Methods, md) } @@ -350,14 +355,16 @@ func (d paramDesc) GoName() string { } type methodDesc struct { - ServiceName string - Name string - Method string - Path string - RequestType string - QueryParams []paramDesc - PathParams []paramDesc - NeedsBody bool + ServiceName string + Name string + Method string + Path string + RequestType string + QueryParams []paramDesc + PathParams []paramDesc + NeedsBody bool + ClientStreaming bool + ServerStreaming bool } func (d methodDesc) MuxRegistererName() string { @@ -387,43 +394,96 @@ It translates gRPC into RESTful JSON APIs. package {{.Pkg}} import ( "encoding/json" - "fmt" + "io" "net/http" - "google.golang.org/grpc" - "github.com/gengo/grpc-gateway/convert" - "github.com/golang/protobuf/proto" + "github.com/gengo/grpc-gateway/runtime" "github.com/golang/glog" + "github.com/golang/protobuf/proto" "github.com/zenazn/goji/web" "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" {{range $line := .Imports}}{{$line}}{{end}} ) -var _ fmt.Stringer -var _ = convert.String +var _ codes.Code +var _ io.Reader +var _ = runtime.String `)) + handlerTemplate = template.Must(template.New("handler").Parse(` -func handle_{{.ServiceName}}_{{.Name}}(ctx context.Context, c web.C, client {{.ServiceName}}Client, req *http.Request) (msg proto.Message, err error) { - protoReq := new({{.RequestType}}) -{{if .NeedsBody}} - if err = json.NewDecoder(req.Body).Decode(&protoReq); err != nil { +{{if .ClientStreaming}} +{{template "client-streaming-request-func" .}} +{{else}} +{{template "client-rpc-request-func" .}} +{{end}} +`)) + + _ = template.Must(handlerTemplate.New("request-func-signature").Parse(strings.Replace(` +{{if .ServerStreaming}} +func request_{{.ServiceName}}_{{.Name}}(ctx context.Context, c web.C, client {{.ServiceName}}Client, req *http.Request) ({{.ServiceName}}_{{.Name}}Client, error) +{{else}} +func request_{{.ServiceName}}_{{.Name}}(ctx context.Context, c web.C, client {{.ServiceName}}Client, req *http.Request) (msg proto.Message, err error) +{{end}}`, "\n", "", -1))) + + _ = template.Must(handlerTemplate.New("client-streaming-request-func").Parse(` +{{template "request-func-signature" .}} { + stream, err := client.{{.Name}}(ctx) + if err != nil { + glog.Errorf("Failed to start streaming: %v", err) + return nil, err + } + dec := json.NewDecoder(req.Body) + for { + var protoReq {{.RequestType}} + err = dec.Decode(&protoReq) + if err == io.EOF { + break + } + if err != nil { + glog.Errorf("Failed to decode request: %v", err) + return nil, grpc.Errorf(codes.InvalidArgument, "%v", err) + } + if err = stream.Send(&protoReq); err != nil { + glog.Errorf("Failed to send request: %v", err) + return nil, err + } + } +{{if .ServerStreaming}} + if err = stream.CloseSend(); err != nil { + glog.Errorf("Failed to terminate client stream: %v", err) return nil, err } + return stream, nil +{{else}} + return stream.CloseAndRecv() {{end}} +} +`)) + + _ = template.Must(handlerTemplate.New("client-rpc-request-func").Parse(` +{{template "request-func-signature" .}} { + var protoReq {{.RequestType}} {{range $desc := .QueryParams}} protoReq.{{$desc.ProtoName}}, err = {{$desc.ConvertFunc}}(req.FormValue({{$desc.ProtoName | printf "%q"}})) if err != nil { return nil, err } {{end}} +{{if .NeedsBody}} + if err = json.NewDecoder(req.Body).Decode(&protoReq); err != nil { + return nil, err + } +{{end}} {{if .NeedsPathParam}} var val string var ok bool {{range $desc := .PathParams}} val, ok = c.URLParams[{{$desc.ProtoName | printf "%q"}}] if !ok { - return nil, fmt.Errorf("missing parameter %s", {{$desc.ProtoName | printf "%q"}}) + return nil, grpc.Errorf(codes.InvalidArgument, "missing parameter %s", {{$desc.ProtoName | printf "%q"}}) } protoReq.{{$desc.GoName}}, err = {{$desc.ConvertFunc}}(val) if err != nil { @@ -431,12 +491,14 @@ func handle_{{.ServiceName}}_{{.Name}}(ctx context.Context, c web.C, client {{.S } {{end}} {{end}} - return client.{{.Name}}(ctx, protoReq) -} -`)) + + return client.{{.Name}}(ctx, &protoReq) +}`)) trailerTemplate = template.Must(template.New("trailer").Parse(` {{range $svc := .}} +// Register{{$svc.Name}}HandlerFromEndpoint is same as Register{{$svc.Name}}Handler but +// automatically dials to "endpoint" and closes the connection when "ctx" gets done. func Register{{$svc.Name}}HandlerFromEndpoint(ctx context.Context, mux *web.Mux, endpoint string) (err error) { conn, err := grpc.Dial(endpoint) if err != nil { @@ -460,26 +522,22 @@ func Register{{$svc.Name}}HandlerFromEndpoint(ctx context.Context, mux *web.Mux, return Register{{$svc.Name}}Handler(ctx, mux, conn) } +// Register{{$svc.Name}}Handler registers the http handlers for service {{$svc.Name}} to "mux". +// The handlers forward requests to the grpc endpoint over "conn". func Register{{$svc.Name}}Handler(ctx context.Context, mux *web.Mux, conn *grpc.ClientConn) error { client := New{{$svc.Name}}Client(conn) {{range $m := $svc.Methods}} mux.{{$m.MuxRegistererName}}({{$m.Path | printf "%q"}}, func(c web.C, w http.ResponseWriter, req *http.Request) { - resp, err := handle_{{$m.ServiceName}}_{{$m.Name}}(ctx, c, client, req) + resp, err := request_{{.ServiceName}}_{{.Name}}(ctx, c, client, req) if err != nil { - glog.Errorf("RPC error: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) + runtime.HTTPError(w, err) return } - buf, err := json.Marshal(resp) - if err != nil { - glog.Errorf("Marshal error: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - w.Header().Set("Content-Type", "application/json") - if _, err = w.Write(buf); err != nil { - glog.Errorf("Failed to write response: %v", err) - } + {{if .ServerStreaming}} + runtime.ForwardResponseStream(w, func() (proto.Message, error) { return resp.Recv() }) + {{else}} + runtime.ForwardResponseMessage(w, resp) + {{end}} }) {{end}} return nil diff --git a/protoc-gen-grpc-gateway/generator_test.go b/protoc-gen-grpc-gateway/generator_test.go index 1e30ca4b378..8e422ef46ad 100644 --- a/protoc-gen-grpc-gateway/generator_test.go +++ b/protoc-gen-grpc-gateway/generator_test.go @@ -99,48 +99,52 @@ package example import ( "encoding/json" - "fmt" + "io" "net/http" - "github.com/gengo/grpc-gateway/convert" + "github.com/gengo/grpc-gateway/runtime" "github.com/golang/glog" "github.com/golang/protobuf/proto" "github.com/zenazn/goji/web" "golang.org/x/net/context" "google.golang.org/grpc" + "google.golang.org/grpc/codes" ) -var _ fmt.Stringer -var _ = convert.String +var _ codes.Code +var _ io.Reader +var _ = runtime.String -func handle_EchoService_Echo(ctx context.Context, c web.C, client EchoServiceClient, req *http.Request) (msg proto.Message, err error) { - protoReq := new(SimpleMessage) +func request_EchoService_Echo(ctx context.Context, c web.C, client EchoServiceClient, req *http.Request) (msg proto.Message, err error) { + var protoReq SimpleMessage var val string var ok bool val, ok = c.URLParams["id"] if !ok { - return nil, fmt.Errorf("missing parameter %s", "id") + return nil, grpc.Errorf(codes.InvalidArgument, "missing parameter %s", "id") } - protoReq.Id, err = convert.String(val) + protoReq.Id, err = runtime.String(val) if err != nil { return nil, err } - return client.Echo(ctx, protoReq) + return client.Echo(ctx, &protoReq) } -func handle_EchoService_EchoBody(ctx context.Context, c web.C, client EchoServiceClient, req *http.Request) (msg proto.Message, err error) { - protoReq := new(SimpleMessage) +func request_EchoService_EchoBody(ctx context.Context, c web.C, client EchoServiceClient, req *http.Request) (msg proto.Message, err error) { + var protoReq SimpleMessage if err = json.NewDecoder(req.Body).Decode(&protoReq); err != nil { return nil, err } - return client.EchoBody(ctx, protoReq) + return client.EchoBody(ctx, &protoReq) } +// RegisterEchoServiceHandlerFromEndpoint is same as RegisterEchoServiceHandler but +// automatically dials to "endpoint" and closes the connection when "ctx" gets done. func RegisterEchoServiceHandlerFromEndpoint(ctx context.Context, mux *web.Mux, endpoint string) (err error) { conn, err := grpc.Dial(endpoint) if err != nil { @@ -164,45 +168,31 @@ func RegisterEchoServiceHandlerFromEndpoint(ctx context.Context, mux *web.Mux, e return RegisterEchoServiceHandler(ctx, mux, conn) } +// RegisterEchoServiceHandler registers the http handlers for service EchoService to "mux". +// The handlers forward requests to the grpc endpoint over "conn". func RegisterEchoServiceHandler(ctx context.Context, mux *web.Mux, conn *grpc.ClientConn) error { client := NewEchoServiceClient(conn) mux.Post("/v1/example/echo/:id", func(c web.C, w http.ResponseWriter, req *http.Request) { - resp, err := handle_EchoService_Echo(ctx, c, client, req) + resp, err := request_EchoService_Echo(ctx, c, client, req) if err != nil { - glog.Errorf("RPC error: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) + runtime.HTTPError(w, err) return } - buf, err := json.Marshal(resp) - if err != nil { - glog.Errorf("Marshal error: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - w.Header().Set("Content-Type", "application/json") - if _, err = w.Write(buf); err != nil { - glog.Errorf("Failed to write response: %v", err) - } + + runtime.ForwardResponseMessage(w, resp) + }) mux.Post("/v1/example/echo_body", func(c web.C, w http.ResponseWriter, req *http.Request) { - resp, err := handle_EchoService_EchoBody(ctx, c, client, req) + resp, err := request_EchoService_EchoBody(ctx, c, client, req) if err != nil { - glog.Errorf("RPC error: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) + runtime.HTTPError(w, err) return } - buf, err := json.Marshal(resp) - if err != nil { - glog.Errorf("Marshal error: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - w.Header().Set("Content-Type", "application/json") - if _, err = w.Write(buf); err != nil { - glog.Errorf("Failed to write response: %v", err) - } + + runtime.ForwardResponseMessage(w, resp) + }) return nil diff --git a/convert/convert.go b/runtime/convert.go similarity index 98% rename from convert/convert.go rename to runtime/convert.go index 8c804e5c23f..cc6b0da0366 100644 --- a/convert/convert.go +++ b/runtime/convert.go @@ -1,4 +1,4 @@ -package convert +package runtime import ( "strconv" diff --git a/runtime/doc.go b/runtime/doc.go new file mode 100644 index 00000000000..3efbaadda20 --- /dev/null +++ b/runtime/doc.go @@ -0,0 +1,5 @@ +/* +package runtime contains runtime helper functions used by +servers which protoc-gen-grpc-gateway generates. +*/ +package runtime diff --git a/runtime/errors.go b/runtime/errors.go new file mode 100644 index 00000000000..ebb0c877057 --- /dev/null +++ b/runtime/errors.go @@ -0,0 +1,60 @@ +package runtime + +import ( + "net/http" + + "github.com/golang/glog" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" +) + +// HTTPStatusFromCode converts a gRPC error code into the corresponding HTTP response status. +func HTTPStatusFromCode(code codes.Code) int { + switch code { + case codes.OK: + return http.StatusOK + case codes.Canceled: + return http.StatusRequestTimeout + case codes.Unknown: + return http.StatusInternalServerError + case codes.InvalidArgument: + return http.StatusBadRequest + case codes.DeadlineExceeded: + return http.StatusRequestTimeout + case codes.NotFound: + return http.StatusNotFound + case codes.AlreadyExists: + return http.StatusConflict + case codes.PermissionDenied: + return http.StatusForbidden + case codes.Unauthenticated: + return http.StatusUnauthorized + case codes.ResourceExhausted: + return http.StatusForbidden + case codes.FailedPrecondition: + return http.StatusPreconditionFailed + case codes.Aborted: + return http.StatusConflict + case codes.OutOfRange: + return http.StatusBadRequest + case codes.Unimplemented: + return http.StatusNotImplemented + case codes.Internal: + return http.StatusInternalServerError + case codes.Unavailable: + return http.StatusServiceUnavailable + case codes.DataLoss: + return http.StatusInternalServerError + } + + glog.Errorf("Unknown gRPC error code: %v", code) + return http.StatusInternalServerError +} + +// HTTPError replies to the request with the error. +// If "err" is an error from gRPC system, the function replies with the status code mapped by HTTPStatusFromCode. +// If otherwise, it replies with http.StatusInternalServerError. +func HTTPError(w http.ResponseWriter, err error) { + st := HTTPStatusFromCode(grpc.Code(err)) + http.Error(w, err.Error(), st) +} diff --git a/runtime/handler.go b/runtime/handler.go new file mode 100644 index 00000000000..928638c9364 --- /dev/null +++ b/runtime/handler.go @@ -0,0 +1,73 @@ +package runtime + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + + "github.com/golang/glog" + "github.com/golang/protobuf/proto" +) + +type responseStreamChunk struct { + Result proto.Message `json:"result,omitempty"` + Error string `json:"error,omitempty"` +} + +// ForwardResponseStream forwards the stream from gRPC server to REST client. +func ForwardResponseStream(w http.ResponseWriter, recv func() (proto.Message, error)) { + f, ok := w.(http.Flusher) + if !ok { + glog.Errorf("Flush not supported in %T", w) + http.Error(w, "unexpected type of web server", http.StatusInternalServerError) + return + } + + w.Header().Set("Transfer-Encoding", "chunked") + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + f.Flush() + for { + resp, err := recv() + if err == io.EOF { + return + } + if err != nil { + buf, merr := json.Marshal(responseStreamChunk{Error: err.Error()}) + if merr != nil { + glog.Error("Failed to marshal an error: %v", merr) + return + } + if _, werr := fmt.Fprintf(w, "%s\n", buf); werr != nil { + glog.Error("Failed to notify error to client: %v", werr) + return + } + return + } + buf, err := json.Marshal(responseStreamChunk{Result: resp}) + if err != nil { + glog.Error("Failed to marshal response chunk: %v", err) + return + } + if _, err = fmt.Fprintf(w, "%s\n", buf); err != nil { + glog.Error("Failed to send response chunk: %v", err) + return + } + } +} + +// ForwardResponseStream forwards the message from gRPC server to REST client. +func ForwardResponseMessage(w http.ResponseWriter, resp proto.Message) { + buf, err := json.Marshal(resp) + if err != nil { + glog.Errorf("Marshal error: %v", err) + HTTPError(w, err) + return + } + + w.Header().Set("Content-Type", "application/json") + if _, err = w.Write(buf); err != nil { + glog.Errorf("Failed to write response: %v", err) + } +} diff --git a/convert/proto2.go b/runtime/proto2_convert.go similarity index 98% rename from convert/proto2.go rename to runtime/proto2_convert.go index a31d40da580..c6e40680c03 100644 --- a/convert/proto2.go +++ b/runtime/proto2_convert.go @@ -1,4 +1,4 @@ -package convert +package runtime import ( "github.com/golang/protobuf/proto"