Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jhttp: Re-work the Bridge interface to require less plumbing. #52

Merged
merged 2 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions cmd/examples/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/creachadair/jrpc2/handler"
"github.com/creachadair/jrpc2/jhttp"
"github.com/creachadair/jrpc2/metrics"
"github.com/creachadair/jrpc2/server"
)

var port = flag.Int("port", 0, "Service port")
Expand All @@ -34,16 +33,17 @@ func main() {
}

// Start a local server with a single trivial method and bridge it to HTTP.
local := server.NewLocal(handler.Map{
srv := jrpc2.NewServer(handler.Map{
"Ping": handler.New(func(ctx context.Context, msg ...string) string {
return "OK: " + strings.Join(msg, ", ")
}),
}, &server.LocalOptions{
Server: &jrpc2.ServerOptions{
Logger: log.New(os.Stderr, "[jhttp.Bridge] ", log.LstdFlags|log.Lshortfile),
Metrics: metrics.New(),
},
}, &jrpc2.ServerOptions{
Logger: log.New(os.Stderr, "[jhttp.Bridge] ", log.LstdFlags|log.Lshortfile),
Metrics: metrics.New(),
})
http.Handle("/rpc", jhttp.NewBridge(local.Client))
bridge := jhttp.NewBridge(srv, nil)
defer bridge.Close()

http.Handle("/rpc", bridge)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *port), nil))
}
122 changes: 50 additions & 72 deletions jhttp/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@
package jhttp

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"

"github.com/creachadair/jrpc2"
"github.com/creachadair/jrpc2/channel"
)

// A Bridge is a http.Handler that bridges requests to a JSON-RPC client.
// A Bridge is a http.Handler that bridges requests to a JSON-RPC server.
//
// The body of the HTTP POST request must contain the complete JSON-RPC request
// message, encoded with Content-Type: application/json. Either a single
Expand All @@ -26,20 +25,19 @@ import (
// If the HTTP request method is not "POST", the bridge reports 405 (Method Not
// Allowed). If the Content-Type is not application/json, the bridge reports
// 415 (Unsupported Media Type).
//
// The bridge attaches the inbound HTTP request to the context passed to the
// client, allowing an EncodeContext callback to retrieve state from the HTTP
// headers. Use jhttp.HTTPRequest to retrieve the request from the context.
type Bridge struct {
cli *jrpc2.Client
ch channel.Channel
srv *jrpc2.Server
checkType func(string) bool
}

// ServeHTTP implements the required method of http.Handler.
func (b *Bridge) ServeHTTP(w http.ResponseWriter, req *http.Request) {
func (b Bridge) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
} else if req.Header.Get("Content-Type") != "application/json" {
}
if !b.checkType(req.Header.Get("Content-Type")) {
w.WriteHeader(http.StatusUnsupportedMediaType)
return
}
Expand All @@ -49,70 +47,38 @@ func (b *Bridge) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
}

func (b *Bridge) serveInternal(w http.ResponseWriter, req *http.Request) error {
func (b Bridge) serveInternal(w http.ResponseWriter, req *http.Request) error {
body, err := ioutil.ReadAll(req.Body)
if err != nil {
return err
}

// The HTTP request requires a response, but the server will not reply if
// all the requests are notifications. Check whether we have any calls
// needing a response, and choose whether to wait for a reply based on that.
jreq, err := jrpc2.ParseRequests(body)
if err != nil {
return err
}

// Because the bridge shares the JSON-RPC client between potentially many
// HTTP clients, we must virtualize the ID space for requests to preserve
// the HTTP client's assignment of IDs.
//
// To do this, we keep track of the inbound ID for each request so that we
// can map the responses back. This takes advantage of the fact that the
// *jrpc2.Client detangles batch order so that responses come back in the
// same order (modulo notifications) even if the server response did not
// preserve order.

// Generate request specifications for the client.
var inboundID []string // for requests
spec := make([]jrpc2.Spec, len(jreq)) // requests & notifications
for i, req := range jreq {
spec[i] = jrpc2.Spec{
Method: req.Method(),
Notify: req.IsNotification(),
}
if req.HasParams() {
var p json.RawMessage
req.UnmarshalParams(&p)
spec[i].Params = p
}
if !spec[i].Notify {
inboundID = append(inboundID, req.ID())
var hasCall bool
for _, req := range jreq {
if !req.IsNotification() {
hasCall = true
break
}
}

ctx := context.WithValue(req.Context(), httpReqKey{}, req)
rsps, err := b.cli.Batch(ctx, spec)
if err != nil {
if err := b.ch.Send(body); err != nil {
return err
}

// If all the requests were notifications, report success without responses.
if len(rsps) == 0 {
// If there are only notifications, report success without responses.
if !hasCall {
w.WriteHeader(http.StatusNoContent)
return nil
}

// Otherwise, map the responses back to their original IDs, and marshal the
// response back into the body.
for i, rsp := range rsps {
rsp.SetID(inboundID[i])
}

// If the original request was a single message, make sure we encode the
// response the same way.
var reply []byte
if len(rsps) == 1 && (len(body) == 0 || body[0] != '[') {
reply, err = json.Marshal(rsps[0])
} else {
reply, err = json.Marshal(rsps)
}
// Wait for the server to reply.
reply, err := b.ch.Recv()
if err != nil {
return err
}
Expand All @@ -122,23 +88,35 @@ func (b *Bridge) serveInternal(w http.ResponseWriter, req *http.Request) error {
return nil
}

// Close shuts down the client associated with b and reports the result from
// its Close method.
func (b *Bridge) Close() error { return b.cli.Close() }
// Close closes the channel to the server, waits for the server to exit, and
// reports the exit status of the server.
func (b Bridge) Close() error { b.ch.Close(); return b.srv.Wait() }

// NewBridge constructs a new Bridge that dispatches requests through c. It is
// safe for the caller to continue to use c concurrently with the bridge, as
// long as it does not close the client.
func NewBridge(c *jrpc2.Client) *Bridge { return &Bridge{cli: c} }
// NewBridge starts srv constructs a new Bridge that dispatches HTTP requests
// to it. The server must be unstarted, or NewBridge will panic. The server
// will run until the bridge is closed.
func NewBridge(srv *jrpc2.Server, opts *BridgeOptions) Bridge {
cch, sch := channel.Direct()
return Bridge{
ch: cch,
srv: srv.Start(sch),
checkType: opts.checkContentType(),
}
}

type httpReqKey struct{}
// BridgeOptions are optional settings for a Bridge. A nil pointer is ready for
// use and provides default values as described.
type BridgeOptions struct {
// If non-nil, this function is called to check whether the HTTP request's
// declared content-type is valid. If this function returns false, the
// request is rejected. If nil, the default check requires a content type of
// "application/json".
CheckContentType func(contentType string) bool
}

// HTTPRequest returns the HTTP request associated with ctx, or nil. The
// context passed to the JSON-RPC client by the Bridge will contain this value.
func HTTPRequest(ctx context.Context) *http.Request {
req, ok := ctx.Value(httpReqKey{}).(*http.Request)
if ok {
return req
func (o *BridgeOptions) checkContentType() func(string) bool {
if o == nil || o.CheckContentType == nil {
return func(ctype string) bool { return ctype == "application/json" }
}
return nil
return o.CheckContentType
}
7 changes: 3 additions & 4 deletions jhttp/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,20 @@ import (
"net/http/httptest"
"strings"

"github.com/creachadair/jrpc2"
"github.com/creachadair/jrpc2/handler"
"github.com/creachadair/jrpc2/jhttp"
"github.com/creachadair/jrpc2/server"
)

func Example() {
// Set up a local server to demonstrate the API.
loc := server.NewLocal(handler.Map{
srv := jrpc2.NewServer(handler.Map{
"Test": handler.New(func(ctx context.Context, ss ...string) (string, error) {
return strings.Join(ss, " "), nil
}),
}, nil)
defer loc.Close()

b := jhttp.NewBridge(loc.Client)
b := jhttp.NewBridge(srv, nil)
defer b.Close()

hsrv := httptest.NewServer(b)
Expand Down
Loading