Skip to content

Commit

Permalink
Restore request virtualization in the HTTP bridge. (#58)
Browse files Browse the repository at this point in the history
This commit approximately reverts the interface changes to the HTTP Bridge
merged with #52. While those changes did simplify the implementation, they
did so at the cost of usability: Unvirtualized request IDs make it hard for clients
to avoid ID collisions, and the low-level channel interface made it impossible for
the client to access the HTTP request context for authentication.

While this is yet another breaking change to the Bridge interface, it fixes the
problems mentioned in #57, and the resulting interface is a little cleaner.

The main changes here are:

* Update the Bridge API to allow both client and server to be configured.
* Restore ID virtualization.
* Restore the SetID method to *Response.
* Restore and update the tests.
* Restore and improve the HTTP example code.
  • Loading branch information
creachadair authored Sep 27, 2021
1 parent 71ebbad commit a08254e
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 54 deletions.
3 changes: 3 additions & 0 deletions base.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ type Response struct {
// ID returns the request identifier for r.
func (r *Response) ID() string { return r.id }

// SetID sets the ID of r to s, for use in proxies.
func (r *Response) SetID(s string) { r.id = s }

// Error returns a non-nil *Error if the response contains an error.
func (r *Response) Error() *Error { return r.err }

Expand Down
32 changes: 17 additions & 15 deletions cmd/examples/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
// Usage (see also the client example):
//
// go build github.com/creachadair/jrpc2/cmd/examples/http
// ./http -port 8080
// ./http -listen :8080
//
// The server accepts RPCs on http://localhost:<port>/rpc.
package main

import (
"context"
"flag"
"fmt"
"log"
"net/http"
"os"
Expand All @@ -24,26 +23,29 @@ import (
"github.com/creachadair/jrpc2/metrics"
)

var port = flag.Int("port", 0, "Service port")
var listenAddr = flag.String("listen", "", "Service address")

func main() {
flag.Parse()
if *port <= 0 {
log.Fatal("You must provide a positive -port to listen on")
if *listenAddr == "" {
log.Fatal("You must provide a non-empty -listen address")
}

// Start a local server with a single trivial method and bridge it to HTTP.
srv := jrpc2.NewServer(handler.Map{
"Ping": handler.New(func(ctx context.Context, msg ...string) string {
return "OK: " + strings.Join(msg, ", ")
}),
}, &jrpc2.ServerOptions{
Logger: log.New(os.Stderr, "[jhttp.Bridge] ", log.LstdFlags|log.Lshortfile),
Metrics: metrics.New(),
// Start an HTTP bridge with a single trivial method.
bridge := jhttp.NewBridge(handler.Map{
"Ping": handler.New(ping),
}, &jhttp.BridgeOptions{
Server: &jrpc2.ServerOptions{
Logger: log.New(os.Stderr, "[jhttp.Bridge] ", log.LstdFlags|log.Lshortfile),
Metrics: metrics.New(),
},
})
bridge := jhttp.NewBridge(srv, nil)
defer bridge.Close()

http.Handle("/rpc", bridge)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *port), nil))
log.Fatal(http.ListenAndServe(*listenAddr, nil))
}

func ping(ctx context.Context, msg ...string) string {
return "OK: " + strings.Join(msg, "|")
}
121 changes: 98 additions & 23 deletions jhttp/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
package jhttp

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"

"github.com/creachadair/jrpc2"
"github.com/creachadair/jrpc2/channel"
"github.com/creachadair/jrpc2/server"
)

// A Bridge is a http.Handler that bridges requests to a JSON-RPC server.
Expand All @@ -25,9 +28,12 @@ import (
// If the HTTP request method is not "POST", the bridge reports 405 (Method Not
// Allowed). If the Content-Type is not application/json, the bridge reports
// 415 (Unsupported Media Type).
//
// The bridge attaches the inbound HTTP request to the context passed to the
// client, allowing an EncodeContext callback to retrieve state from the HTTP
// headers. Use jhttp.HTTPRequest to retrieve the request from the context.
type Bridge struct {
ch channel.Channel
srv *jrpc2.Server
local server.Local
checkType func(string) bool
}

Expand Down Expand Up @@ -64,25 +70,62 @@ func (b Bridge) serveInternal(w http.ResponseWriter, req *http.Request) error {
if err != nil && err != jrpc2.ErrInvalidVersion {
return err
}
var hasCall bool
for _, req := range jreq {
if !req.IsNotification() {
hasCall = true
break

// Because the bridge shares the JSON-RPC client between potentially many
// HTTP clients, we must virtualize the ID space for requests to preserve
// the HTTP client's assignment of IDs.
//
// To do this, we keep track of the inbound ID for each request so that we
// can map the responses back. This takes advantage of the fact that the
// *jrpc2.Client detangles batch order so that responses come back in the
// same order (modulo notifications) even if the server response did not
// preserve order.

// Generate request specifications for the client.
var inboundID []string // for requests
spec := make([]jrpc2.Spec, len(jreq)) // requests & notifications
for i, req := range jreq {
spec[i] = jrpc2.Spec{
Method: req.Method(),
Notify: req.IsNotification(),
}
if req.HasParams() {
var p json.RawMessage
req.UnmarshalParams(&p)
spec[i].Params = p
}
if !spec[i].Notify {
inboundID = append(inboundID, req.ID())
}
}
if err := b.ch.Send(body); err != nil {

// Attach the HTTP request to the client context, so the encoder can see it.
ctx := context.WithValue(req.Context(), httpReqKey{}, req)
rsps, err := b.local.Client.Batch(ctx, spec)
if err != nil {
return err
}

// If there are only notifications, report success without responses.
if !hasCall {
// If all the requests were notifications, report success without responses.
if len(rsps) == 0 {
w.WriteHeader(http.StatusNoContent)
return nil
}

// Wait for the server to reply.
reply, err := b.ch.Recv()
// Otherwise, map the responses back to their original IDs, and marshal the
// response back into the body.
for i, rsp := range rsps {
rsp.SetID(inboundID[i])
}

// If the original request was a single message, make sure we encode the
// response the same way.
var reply []byte
if len(rsps) == 1 && !bytes.HasPrefix(bytes.TrimSpace(body), []byte("[")) {
reply, err = json.Marshal(rsps[0])
} else {
reply, err = json.Marshal(rsps)
}
if err != nil {
return err
}
Expand All @@ -93,34 +136,66 @@ func (b Bridge) serveInternal(w http.ResponseWriter, req *http.Request) error {
}

// Close closes the channel to the server, waits for the server to exit, and
// reports the exit status of the server.
func (b Bridge) Close() error { b.ch.Close(); return b.srv.Wait() }

// NewBridge constructs a new Bridge that starts srv and dispatches HTTP
// requests to it. The server must be unstarted or NewBridge will panic.
// The server will run until the bridge is closed.
func NewBridge(srv *jrpc2.Server, opts *BridgeOptions) Bridge {
cch, sch := channel.Direct()
// reports its exit status.
func (b Bridge) Close() error { return b.local.Close() }

// NewBridge constructs a new Bridge that starts a server on mux and dispatches
// HTTP requests to it. The server will run until the bridge is closed.
func NewBridge(mux jrpc2.Assigner, opts *BridgeOptions) Bridge {
return Bridge{
ch: cch,
srv: srv.Start(sch),
local: server.NewLocal(mux, &server.LocalOptions{
Client: opts.clientOptions(),
Server: opts.serverOptions(),
}),
checkType: opts.checkContentType(),
}
}

// BridgeOptions are optional settings for a Bridge. A nil pointer is ready for
// use and provides default values as described.
type BridgeOptions struct {
// Options for the bridge client (default nil).
Client *jrpc2.ClientOptions

// Options for the bridge server (default nil).
Server *jrpc2.ServerOptions

// If non-nil, this function is called to check whether the HTTP request's
// declared content-type is valid. If this function returns false, the
// request is rejected. If nil, the default check requires a content type of
// "application/json".
CheckContentType func(contentType string) bool
}

func (o *BridgeOptions) clientOptions() *jrpc2.ClientOptions {
if o == nil {
return nil
}
return o.Client
}

func (o *BridgeOptions) serverOptions() *jrpc2.ServerOptions {
if o == nil {
return nil
}
return o.Server
}

func (o *BridgeOptions) checkContentType() func(string) bool {
if o == nil || o.CheckContentType == nil {
return func(ctype string) bool { return ctype == "application/json" }
}
return o.CheckContentType
}

type httpReqKey struct{}

// HTTPRequest returns the HTTP request associated with ctx, or nil. The
// context passed to the JSON-RPC client by the Bridge will contain this value.
func HTTPRequest(ctx context.Context) *http.Request {
req, ok := ctx.Value(httpReqKey{}).(*http.Request)
if ok {
return req
}
return nil
}
7 changes: 2 additions & 5 deletions jhttp/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,17 @@ import (
"net/http/httptest"
"strings"

"github.com/creachadair/jrpc2"
"github.com/creachadair/jrpc2/handler"
"github.com/creachadair/jrpc2/jhttp"
)

func Example() {
// Set up a local server to demonstrate the API.
srv := jrpc2.NewServer(handler.Map{
// Set up a bridge to demonstrate the API.
b := jhttp.NewBridge(handler.Map{
"Test": handler.New(func(ctx context.Context, ss ...string) (string, error) {
return strings.Join(ss, " "), nil
}),
}, nil)

b := jhttp.NewBridge(srv, nil)
defer b.Close()

hsrv := httptest.NewServer(b)
Expand Down
25 changes: 14 additions & 11 deletions jhttp/jhttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package jhttp_test
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -25,12 +26,18 @@ var testService = handler.Map{
}),
}

func TestBridge(t *testing.T) {
// Set up a JSON-RPC server to answer requests bridged from HTTP.
srv := jrpc2.NewServer(testService, nil)
func checkContext(ctx context.Context, _ string, p json.RawMessage) (json.RawMessage, error) {
if jhttp.HTTPRequest(ctx) == nil {
return nil, errors.New("no HTTP request in context")
}
return p, nil
}

// Bridge HTTP to the JSON-RPC server.
b := jhttp.NewBridge(srv, nil)
func TestBridge(t *testing.T) {
// Set up a bridge with the test configuration.
b := jhttp.NewBridge(testService, &jhttp.BridgeOptions{
Client: &jrpc2.ClientOptions{EncodeContext: checkContext},
})
defer checkClose(t, b)

// Create an HTTP test server to call into the bridge.
Expand Down Expand Up @@ -155,9 +162,7 @@ func TestBridge(t *testing.T) {

// Verify that the content-type check hook works.
func TestBridge_contentTypeCheck(t *testing.T) {
srv := jrpc2.NewServer(testService, nil)

b := jhttp.NewBridge(srv, &jhttp.BridgeOptions{
b := jhttp.NewBridge(testService, &jhttp.BridgeOptions{
CheckContentType: func(ctype string) bool {
return ctype == "application/octet-stream"
},
Expand Down Expand Up @@ -190,9 +195,7 @@ func TestBridge_contentTypeCheck(t *testing.T) {
}

func TestChannel(t *testing.T) {
srv := jrpc2.NewServer(testService, nil)

b := jhttp.NewBridge(srv, nil)
b := jhttp.NewBridge(testService, nil)
defer checkClose(t, b)
hsrv := httptest.NewServer(b)
defer hsrv.Close()
Expand Down

0 comments on commit a08254e

Please sign in to comment.