Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unclean shutdown when using single-peer with gRPC #2241

Open
Groxx opened this issue Jan 2, 2024 · 1 comment
Open

Unclean shutdown when using single-peer with gRPC #2241

Groxx opened this issue Jan 2, 2024 · 1 comment

Comments

@Groxx
Copy link
Contributor

Groxx commented Jan 2, 2024

When converting to some zaptest loggers in some internal tests, I started getting occasional test panics like:

panic: Log in goroutine after TestName has completed: 2024-01-02T19:16:29.853Z DEBUG   peer status change  {"status": "Unavailable", "peer": "127.0.0.1:1234", "transport": "grpc"}
goroutine 239 [running]:
testing.(*common).logDepth(0xc0007dc000, {0xc00021f500, 0x7a}, 0x3)
    GOROOT/src/testing/testing.go:1022 +0x4c5
testing.(*common).log(...)
    GOROOT/src/testing/testing.go:1004
testing.(*common).Logf(0xc0007dc000, {0x6aa777?, 0x18e30c5?}, {0xc0001ad3f0?, 0x32fa00?, 0x1?})
    GOROOT/src/testing/testing.go:1055 +0x54
go.uber.org/zap/zaptest.testingWriter.Write({{0x974b18?, 0xc0007dc000?}, 0xaa?}, {0xc0008f2c00?, 0x7b, 0xc0001ad3e0?})
    external/org_uber_go_zap/zaptest/logger.go:130 +0xdc
go.uber.org/zap/zapcore.(*ioCore).Write(0xc000708780, {0xff, {0xc15d362372e443f0, 0x687c687, 0x326cf60}, {0x0, 0x0}, {0x6c2592, 0x12}, {0x0, ...}, ...}, ...)
    external/org_uber_go_zap/zapcore/core.go:99 +0xb5
go.uber.org/zap/zapcore.(*CheckedEntry).Write(0xc0000280d0, {0xc0002a26c0, 0x3, 0x3})
    external/org_uber_go_zap/zapcore/entry.go:253 +0x1dc
go.uber.org/zap.(*Logger).Debug(0x0?, {0x6c2592?, 0xc0001a0640?}, {0xc0002a26c0, 0x3, 0x3})
    external/org_uber_go_zap/logger.go:238 +0x51
go.uber.org/yarpc/transport/grpc.(*grpcPeer).setConnectionStatus(0xc0007309c0, 0x0)
    external/org_uber_go_yarpc/transport/grpc/peer.go:98 +0x272
go.uber.org/yarpc/transport/grpc.(*grpcPeer).monitorConnectionStatus(0xc0007309c0)
    external/org_uber_go_yarpc/transport/grpc/peer.go:90 +0x95
created by go.uber.org/yarpc/transport/grpc.(*Transport).newPeer in goroutine 236
    external/org_uber_go_yarpc/transport/grpc/peer.go:73 +0x6cb

After digging around a bit, I can see we are using some single-peer-choosers with grpc, and:

So if shutdown calls peer.Single.Stop() and then grpc.Transport.Stop(), the peer will be removed after having only been told to stop(), and the transport's Stop() will not wait for it to stop its background goroutine.
I'm not 100% certain that shutdown occurs in this order (fx logs don't make that explicit), but it seems like it probably has to as peers are used in outbounds. Stop RPC == stop outbounds -> stop peers -> stop transports, right?


I'm not seeing any way to patch this from the outside, as the peer's instance and API doesn't seem to be exposed anywhere. Which is probably a good thing. So I think this has to be fixed internally.

As a possibly simple option: maybe grpc.Transport should just keep all stop-chans (remove the peer but not the chan in ReleasePeer) and wait on all of them during Stop()? It would leak empty chans unless some cleanup process was run, but if that's an issue then closed chans could probably be cleared out in ReleasePeer as a garbage collector.
Or should ReleasePeer just wait too? I'm not sure what the semantics are here, but it seems like it may be intentional that it doesn't wait.

I haven't carefully checked the other transports to see if they have similar issues, but e.g. http is sufficiently different that it doesn't obviously have the same problem.

@Groxx
Copy link
Contributor Author

Groxx commented Jan 2, 2024

Yea, applying this patch brings me from ~20% failure to zero after thousands of iterations:

diff --git a/transport/grpc/transport.go b/transport/grpc/transport.go
index 15d69460..2c7a2c32 100644
--- a/transport/grpc/transport.go
+++ b/transport/grpc/transport.go
@@ -39,6 +39,7 @@ type Transport struct {
 	once          *lifecycle.Once
 	options       *transportOptions
 	addressToPeer map[string]*grpcPeer
+	waitPeers     []*grpcPeer
 }

 // NewTransport returns a new Transport.
@@ -71,6 +72,9 @@ func (t *Transport) Stop() error {
 		for _, grpcPeer := range t.addressToPeer {
 			grpcPeer.wait()
 		}
+		for _, stoppedGrpcPeer := range t.waitPeers {
+			stoppedGrpcPeer.wait()
+		}
 		return nil
 	})
 }
@@ -144,6 +148,7 @@ func (t *Transport) ReleasePeer(pid peer.Identifier, ps peer.Subscriber) error {
 	if p.NumSubscribers() == 0 {
 		delete(t.addressToPeer, address)
 		p.stop()
+		t.waitPeers = append(t.waitPeers, p)
 	}
 	return nil
 }

I have no idea if this^ is worth using, I'm not familiar enough with the code/expectations in here. But it's an effective proof of concept at least.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

1 participant