Skip to content

Commit

Permalink
Replaced os.Exit with RemoteWeavelet Wait method. (#531)
Browse files Browse the repository at this point in the history
Recall that a RemoteWeavelet maintains (1) an RPC server to serve remote
component method calls from other weavelets and (2) a connection to the
envelope. Before this PR, a RemoteWeavelet would os.Exit if either (1)
or (2) went down. This made testing failure scenarios impossible, as the
process would simply exit.

This PR removes the calls to os.Exit. Instead, I added a Wait method to
the RemoteWeavelet that blocks until the weavelet has shut down and
returns any errors encountered during shutdown. With this new Wait
method, I was able to add a unit test to test the behavior of a weavelet
when its connection to the envelope breaks.
  • Loading branch information
mwhittaker authored Aug 21, 2023
1 parent 958fbf7 commit e11bbb2
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 48 deletions.
1 change: 1 addition & 0 deletions godeps.txt
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ github.com/ServiceWeaver/weaver/internal/weaver
go.opentelemetry.io/otel/semconv/v1.4.0
go.opentelemetry.io/otel/trace
golang.org/x/exp/maps
golang.org/x/sync/errgroup
google.golang.org/protobuf/types/known/timestamppb
log/slog
math
Expand Down
2 changes: 1 addition & 1 deletion internal/envelope/conn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func makeConnections(t *testing.T, handler conn.EnvelopeHandler) (*conn.Envelope
panic(err)
}
created <- struct{}{}
err = w.Serve(nil)
err = w.Serve(ctx, nil)
weaveletDone <- err
}()

Expand Down
11 changes: 9 additions & 2 deletions internal/envelope/conn/weavelet_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package conn

import (
"bytes"
"context"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -111,16 +112,22 @@ func NewWeaveletConn(r io.ReadCloser, w io.WriteCloser) (*WeaveletConn, error) {

// Serve accepts RPC requests from the envelope. Requests are handled serially
// in the order they are received.
func (w *WeaveletConn) Serve(h WeaveletHandler) error {
func (w *WeaveletConn) Serve(ctx context.Context, h WeaveletHandler) error {
go func() {
<-ctx.Done()
w.conn.cleanup(ctx.Err())
}()

msg := &protos.EnvelopeMsg{}
for {
for ctx.Err() == nil {
if err := w.conn.recv(msg); err != nil {
return err
}
if err := w.handleMessage(h, msg); err != nil {
return err
}
}
return ctx.Err()
}

// EnvelopeInfo returns the EnvelopeInfo received from the envelope.
Expand Down
72 changes: 54 additions & 18 deletions internal/testdeployer/remoteweavelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,11 @@ import (
"github.com/ServiceWeaver/weaver/runtime/logging"
"github.com/ServiceWeaver/weaver/runtime/protos"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
)

// TODO(mwhittaker):
//
// - Kill the remote weavelet at the end of every test.
// - Create a weavelet and then break the connection.
// - Add a Wait method to RemoteWeavelet to wait for shutdown.
// - Hook up two weavelets then cancel one of them.
// - Update routing info with bad routing info.
// - Component with a failing Init method.
Expand All @@ -55,6 +53,7 @@ type deployer struct {
env *conn.EnvelopeConn // envelope
wlet *weaver.RemoteWeavelet // weavelet
logger *logging.TestLogger // logger
threads *errgroup.Group // background threads

// A unit test can override the following envelope methods to do things
// like inject errors or return invalid values.
Expand Down Expand Up @@ -159,6 +158,7 @@ func deploy(t *testing.T, ctx context.Context) *deployer {
func deployWithInfo(t *testing.T, ctx context.Context, info *protos.EnvelopeInfo) *deployer {
t.Helper()
ctx, cancel := context.WithCancel(ctx)
threads, ctx := errgroup.WithContext(ctx)

// Create pipes to the weavelet.
toWeaveletReader, toWeaveletWriter, err := os.Pipe()
Expand Down Expand Up @@ -209,15 +209,33 @@ func deployWithInfo(t *testing.T, ctx context.Context, info *protos.EnvelopeInfo
env: env,
wlet: wlet,
logger: logging.NewTestLogger(t, testing.Verbose()),
threads: threads,
}
t.Cleanup(d.shutdown)

// Monitor the envelope and weavelet in background threads. Discard errors
// after the context has been cancelled, as those are expected.
threads.Go(func() error {
if err := wlet.Wait(); err != nil && ctx.Err() == nil {
return err
}
return nil
})
threads.Go(func() error {
if err := env.Serve(d); err != nil && ctx.Err() == nil {
return err
}
return nil
})

return d
}

// shutdown shuts down a deployer and its weavelet.
func (d *deployer) shutdown() {
d.cancel()
// TODO(mwhittaker): Wait for the weavelet to exit.
if err := d.threads.Wait(); err != nil {
d.t.Fatal(err)
}
}

// testComponents tests that the components spawned by d are working properly.
Expand Down Expand Up @@ -291,6 +309,22 @@ func TestInvalidHandshake(t *testing.T) {
}
}

func TestClosePipes(t *testing.T) {
// Note that d.shutdown will fail because we close the pipes below, so we
// instead call d.cancel() and d.thrads.Wait() directly.
d := deploy(t, context.Background())
defer d.cancel()
defer d.threads.Wait()
testComponents(d)

// Close the pipes to the weavelet. The weavelet should error out.
d.toWeaveletWriter.Close()
d.toEnvelopeReader.Close()
if err := d.wlet.Wait(); err == nil {
t.Fatal("unexpected success")
}
}

func TestLocalhostWeaveletAddress(t *testing.T) {
// Start the weavelet with internal address "localhost:12345".
d := deployWithInfo(t, context.Background(), &protos.EnvelopeInfo{
Expand All @@ -299,6 +333,7 @@ func TestLocalhostWeaveletAddress(t *testing.T) {
Id: uuid.New().String(),
InternalAddress: "localhost:12345",
})
defer d.shutdown()
got := d.env.WeaveletInfo().DialAddr
const want = "tcp://127.0.0.1:12345"
if got != want {
Expand Down Expand Up @@ -326,6 +361,7 @@ func TestHostnameWeaveletAddress(t *testing.T) {
Id: uuid.New().String(),
InternalAddress: fmt.Sprintf("%s:12345", ips[0]),
})
defer d.shutdown()
got := d.env.WeaveletInfo().DialAddr
want := fmt.Sprintf("tcp://%v:12345", ips[0])
if got != want {
Expand All @@ -335,14 +371,14 @@ func TestHostnameWeaveletAddress(t *testing.T) {

func TestErrorFreeExecution(t *testing.T) {
d := deploy(t, context.Background())
go d.env.Serve(d)
defer d.shutdown()
testComponents(d)
}

func TestFailActivateComponent(t *testing.T) {
ctx := context.Background()
d := deploy(t, ctx)
go d.env.Serve(d)
defer d.shutdown()

// Fail ActivateComponent a number of times.
const n = 3
Expand Down Expand Up @@ -372,7 +408,7 @@ func TestFailGetListenerAddress(t *testing.T) {

ctx := context.Background()
d := deploy(t, ctx)
go d.env.Serve(d)
defer d.shutdown()

// Fail GetListenerAddress a number of times.
const n = 3
Expand All @@ -393,7 +429,7 @@ func TestGetListenerAddressReturnsInvalidAddress(t *testing.T) {

ctx := context.Background()
d := deploy(t, ctx)
go d.env.Serve(d)
defer d.shutdown()

// Return an invalid listener a number of times.
const n = 3
Expand Down Expand Up @@ -421,7 +457,7 @@ func TestGetListenerAddressReturnsAddressAlreadyInUse(t *testing.T) {

ctx := context.Background()
d := deploy(t, ctx)
go d.env.Serve(d)
defer d.shutdown()

// Tell the weavelet to listen on port 45678 a number of times.
const n = 3
Expand All @@ -439,7 +475,7 @@ func TestGetListenerAddressReturnsAddressAlreadyInUse(t *testing.T) {
func TestFailExportListener(t *testing.T) {
ctx := context.Background()
d := deploy(t, ctx)
go d.env.Serve(d)
defer d.shutdown()

// Fail ExportListener a number of times.
const n = 3
Expand All @@ -460,7 +496,7 @@ func TestExportListenerReturnsError(t *testing.T) {

ctx := context.Background()
d := deploy(t, ctx)
go d.env.Serve(d)
defer d.shutdown()

// Return an error from ExportListener a number of times.
const n = 3
Expand All @@ -479,7 +515,7 @@ func TestExportListenerReturnsError(t *testing.T) {
func TestUpdateMissingComponents(t *testing.T) {
ctx := context.Background()
d := deploy(t, ctx)
go d.env.Serve(d)
defer d.shutdown()

// Update the weavelet with components that don't exist.
components := &protos.UpdateComponentsRequest{Components: []string{"foo", "bar"}}
Expand All @@ -493,7 +529,7 @@ func TestUpdateMissingComponents(t *testing.T) {
func TestUpdateExistingComponents(t *testing.T) {
ctx := context.Background()
d := deploy(t, ctx)
go d.env.Serve(d)
defer d.shutdown()
testComponents(d)

// Update the weavelet with components that have already been started.
Expand All @@ -514,7 +550,7 @@ func TestUpdateExistingComponents(t *testing.T) {
func TestUpdateNilRoutingInfo(t *testing.T) {
ctx := context.Background()
d := deploy(t, ctx)
go d.env.Serve(d)
defer d.shutdown()

// Update the weavelet with a nil routing info.
routing := &protos.UpdateRoutingInfoRequest{}
Expand All @@ -528,7 +564,7 @@ func TestUpdateNilRoutingInfo(t *testing.T) {
func TestUpdateRoutingInfoMissingComponent(t *testing.T) {
ctx := context.Background()
d := deploy(t, ctx)
go d.env.Serve(d)
defer d.shutdown()

// Update the weavelet with routing info for a component that doesn't
// exist.
Expand All @@ -548,7 +584,7 @@ func TestUpdateRoutingInfoMissingComponent(t *testing.T) {
func TestUpdateRoutingInfoNotStartedComponent(t *testing.T) {
ctx := context.Background()
d := deploy(t, ctx)
go d.env.Serve(d)
defer d.shutdown()

// Update the weavelet with routing info for a component that has hasn't
// started yet.
Expand All @@ -567,7 +603,7 @@ func TestUpdateRoutingInfoNotStartedComponent(t *testing.T) {
func TestUpdateLocalRoutingInfoWithNonLocal(t *testing.T) {
ctx := context.Background()
d := deploy(t, ctx)
go d.env.Serve(d)
defer d.shutdown()
testComponents(d)

// Update the weavelet with non-local routing info for a component, even
Expand Down
40 changes: 23 additions & 17 deletions internal/weaver/remoteweavelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"log/slog"
"net"
"os"
"reflect"
"strings"
"sync"
Expand All @@ -40,6 +39,7 @@ import (
"github.com/ServiceWeaver/weaver/runtime/retry"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
)

// readyMethodKey holds the key for a method used to check if a backend is ready.
Expand All @@ -56,6 +56,7 @@ type RemoteWeaveletOptions struct {
// the single process deployer.
type RemoteWeavelet struct {
ctx context.Context // shuts down the weavelet when canceled
servers *errgroup.Group // background servers
opts RemoteWeaveletOptions // options
conn *conn.WeaveletConn // connection to envelope
syslogger *slog.Logger // system logger
Expand Down Expand Up @@ -112,8 +113,10 @@ type listener struct {
// specified in the provided registrations. bootstrap is used to establish a
// connection with an envelope.
func NewRemoteWeavelet(ctx context.Context, regs []*codegen.Registration, bootstrap runtime.Bootstrap, opts RemoteWeaveletOptions) (*RemoteWeavelet, error) {
servers, ctx := errgroup.WithContext(ctx)
w := &RemoteWeavelet{
ctx: ctx,
servers: servers,
opts: opts,
componentsByName: map[string]*component{},
componentsByIntf: map[reflect.Type]*component{},
Expand Down Expand Up @@ -173,26 +176,40 @@ func NewRemoteWeavelet(ctx context.Context, regs []*codegen.Registration, bootst
}

// Serve deployer API requests on the weavelet conn.
runAndDie(ctx, "serve weavelet conn", func() error {
return w.conn.Serve(w)
servers.Go(func() error {
if err := w.conn.Serve(ctx, w); err != nil {
w.syslogger.Error("weavelet conn failed", "err", err)
return err
}
return nil
})

// Serve RPC requests from other weavelets.
server := &server{Listener: w.conn.Listener(), wlet: w}
runAndDie(w.ctx, "handle calls", func() error {
servers.Go(func() error {
server := &server{Listener: w.conn.Listener(), wlet: w}
opts := call.ServerOptions{
Logger: w.syslogger,
Tracer: w.tracer,
InlineHandlerDuration: 20 * time.Microsecond,
WriteFlattenLimit: 4 << 10,
}
return call.Serve(w.ctx, server, opts)
if err := call.Serve(w.ctx, server, opts); err != nil {
w.syslogger.Error("RPC server failed", "err", err)
return err
}
return nil
})

w.syslogger.Debug(fmt.Sprintf("🧶 weavelet started on %s", w.conn.WeaveletInfo().DialAddr))
return w, nil
}

// Wait waits for the RemoteWeavelet to fully shut down after its context has
// been cancelled.
func (w *RemoteWeavelet) Wait() error {
return w.servers.Wait()
}

// GetIntf implements the Weavelet interface.
func (w *RemoteWeavelet) GetIntf(t reflect.Type) (any, error) {
return w.getIntf(t, "root")
Expand Down Expand Up @@ -723,17 +740,6 @@ func (s *server) handlers(components []string) (*call.HandlerMap, error) {
return hm, nil
}

// runAndDie runs fn in the background. Errors are fatal unless ctx has been
// canceled.
func runAndDie(ctx context.Context, msg string, fn func() error) {
go func() {
if err := fn(); err != nil && ctx.Err() == nil {
fmt.Fprintf(os.Stderr, "%s: %v\n", msg, err)
os.Exit(1)
}
}()
}

// waitUntilReady blocks until a successful call to the "ready" method is made
// on the provided client.
func waitUntilReady(ctx context.Context, client call.Connection) error {
Expand Down
4 changes: 2 additions & 2 deletions runtime/envelope/envelope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestMain(m *testing.M) {
return nil
},
"writetraces": func() error { return writeTraces(conn) },
"serve_conn": func() error { return conn.Serve(nil) },
"serve_conn": func() error { return conn.Serve(context.Background(), nil) },
}
fn, ok := cmds[cmd]
if !ok {
Expand All @@ -99,7 +99,7 @@ func TestMain(m *testing.M) {
fmt.Fprintf(os.Stderr, "subprocess: %v\n", err)
os.Exit(1)
}
conn.Serve(nil)
conn.Serve(context.Background(), nil)
}

var err error
Expand Down
4 changes: 1 addition & 3 deletions weaver.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,7 @@ func runRemote[T any, _ PointerToMain[T]](ctx context.Context, app func(context.
}
return app(ctx, main.(*T))
}

<-ctx.Done()
return ctx.Err()
return runner.Wait()
}

// Implements[T] is a type that is be embedded inside a component
Expand Down
Loading

0 comments on commit e11bbb2

Please sign in to comment.