From 6b8ce78e4c8fa3709cb75219d71a61fa111c4457 Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Fri, 22 Nov 2024 11:41:24 +0530 Subject: [PATCH 01/10] examples/features/gracefulstop: add example to demonstrate server graceful stop --- examples/features/gracefulstop/README.md | 31 +++++++ examples/features/gracefulstop/client/main.go | 67 +++++++++++++++ examples/features/gracefulstop/server/main.go | 86 +++++++++++++++++++ 3 files changed, 184 insertions(+) create mode 100644 examples/features/gracefulstop/README.md create mode 100644 examples/features/gracefulstop/client/main.go create mode 100644 examples/features/gracefulstop/server/main.go diff --git a/examples/features/gracefulstop/README.md b/examples/features/gracefulstop/README.md new file mode 100644 index 000000000000..a336982ff329 --- /dev/null +++ b/examples/features/gracefulstop/README.md @@ -0,0 +1,31 @@ +# Graceful Stop + +This example demonstrates how to gracefully stop a gRPC server using +Server.GracefulStop(). + +## How to run + +Start the server which will listen to incoming gRPC requests as well as OS +interrupt signals (SIGINT/SIGTERM). After receiving interrupt signal, it calls +`s.GracefulStop()` to gracefully shut down the server. If graceful shutdown +doesn't happen in time, server is stopped forcefully. + +```sh +$ go run server/main.go +``` + +In a separate terminal, start the client which will send multiple requests to +the server with some delay between each request. + +```sh +$ go run client/main.go +``` + +Use Ctrl+C or SIGTERM to signal the server to shut down. + +The server begins a graceful stop: +- It finishes ongoing requests. +- Rejects new incoming requests. + +The client will notice the server's shutdown when a request fails. + diff --git a/examples/features/gracefulstop/client/main.go b/examples/features/gracefulstop/client/main.go new file mode 100644 index 000000000000..4ce401513200 --- /dev/null +++ b/examples/features/gracefulstop/client/main.go @@ -0,0 +1,67 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Binary client demonstrates sending multiple requests to server and observe +// graceful stop. +package main + +import ( + "context" + "flag" + "log" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + pb "google.golang.org/grpc/examples/helloworld/helloworld" + "google.golang.org/grpc/status" +) + +var addr = flag.String("addr", "localhost:50052", "the address to connect to") + +func main() { + flag.Parse() + + // Set up a connection to the server. + conn, err := grpc.NewClient(*addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() + c := pb.NewGreeterClient(conn) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + for i := 1; i <= 10; i++ { + log.Printf("Calling SayHello %d time", i) + r, err := c.SayHello(ctx, &pb.HelloRequest{}) + if err != nil { + if status.Code(err) != codes.InvalidArgument { + log.Printf("Received unexpected error: %v", err) + continue + } + log.Printf("Received error: %v", err) + continue + } + log.Printf("Received response: %s", r.Message) + time.Sleep(time.Second) + } + + log.Printf("Client finished interaction with server.") +} diff --git a/examples/features/gracefulstop/server/main.go b/examples/features/gracefulstop/server/main.go new file mode 100644 index 000000000000..d94b5ab91e3d --- /dev/null +++ b/examples/features/gracefulstop/server/main.go @@ -0,0 +1,86 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Binary server demonstrates how to gracefully stop a gRPC server. +package main + +import ( + "context" + "flag" + "fmt" + "log" + "net" + "os" + "os/signal" + "syscall" + "time" + + "google.golang.org/grpc" + pb "google.golang.org/grpc/examples/helloworld/helloworld" +) + +var port = flag.Int("port", 50052, "port number") + +// server is used to implement helloworld.GreeterServer. +type server struct { + pb.UnimplementedGreeterServer +} + +// SayHello implements helloworld.GreeterServer. +func (s *server) SayHello(_ context.Context, _ *pb.HelloRequest) (*pb.HelloReply, error) { + return &pb.HelloReply{Message: "Hello"}, nil +} + +func main() { + flag.Parse() + + address := fmt.Sprintf(":%v", *port) + lis, err := net.Listen("tcp", address) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + + // Create a channel to listen for OS signals. + stop := make(chan os.Signal, 1) + signal.Notify(stop, os.Interrupt, syscall.SIGTERM) + + s := grpc.NewServer() + pb.RegisterGreeterServer(s, &server{}) + + go func() { + // Wait for an OS signal for graceful shutdown. + <-stop + fmt.Println("Shutting down server...") + s.GracefulStop() + close(stop) + }() + + if err := s.Serve(lis); err != nil { + log.Fatalf("failed to serve: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + select { + case <-stop: + log.Printf("Server stopped gracefully") + case <-ctx.Done(): + log.Printf("Graceful stop timeout reached. Forcing server stop.") + s.Stop() // Forceful stop + } +} From 5ee5e3b626d4d22f2f346086bc5bfaefdecb1af6 Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Sat, 23 Nov 2024 23:18:51 +0530 Subject: [PATCH 02/10] use ServerStreaming scenario to demonstrate graceful shutdown --- examples/features/gracefulstop/server/main.go | 67 +++++++++++++------ 1 file changed, 45 insertions(+), 22 deletions(-) diff --git a/examples/features/gracefulstop/server/main.go b/examples/features/gracefulstop/server/main.go index d94b5ab91e3d..8c3f5bb9ebbb 100644 --- a/examples/features/gracefulstop/server/main.go +++ b/examples/features/gracefulstop/server/main.go @@ -19,30 +19,61 @@ package main import ( - "context" "flag" "fmt" "log" "net" "os" "os/signal" + "sync" + "sync/atomic" "syscall" "time" "google.golang.org/grpc" - pb "google.golang.org/grpc/examples/helloworld/helloworld" + pb "google.golang.org/grpc/examples/features/proto/echo" ) -var port = flag.Int("port", 50052, "port number") +var ( + port = flag.Int("port", 50052, "port number") + streamMessages int32 + mu sync.Mutex +) -// server is used to implement helloworld.GreeterServer. type server struct { - pb.UnimplementedGreeterServer + pb.UnimplementedEchoServer } -// SayHello implements helloworld.GreeterServer. -func (s *server) SayHello(_ context.Context, _ *pb.HelloRequest) (*pb.HelloReply, error) { - return &pb.HelloReply{Message: "Hello"}, nil +// ServerStreamingEcho implements the EchoService.ServerStreamingEcho method. +// It receives an EchoRequest and sends back a stream of EchoResponses. +// The stream will contain up to 5 messages, each sent after a 1-second delay. +// If the client cancels the request or if more than 5 messages are sent, +// the stream will be closed with an error. +func (s *server) ServerStreamingEcho(_ *pb.EchoRequest, stream pb.Echo_ServerStreamingEchoServer) error { + ctx := stream.Context() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + atomic.AddInt32(&streamMessages, 1) + + mu.Lock() + + if streamMessages > 5 { + return fmt.Errorf("request failed") + } + + if err := stream.Send(&pb.EchoResponse{Message: fmt.Sprintf("Messages Sent: %d", streamMessages)}); err != nil { + return err + } + + mu.Unlock() + + time.Sleep(1 * time.Second) + } + } } func main() { @@ -59,28 +90,20 @@ func main() { signal.Notify(stop, os.Interrupt, syscall.SIGTERM) s := grpc.NewServer() - pb.RegisterGreeterServer(s, &server{}) + pb.RegisterEchoServer(s, &server{}) go func() { // Wait for an OS signal for graceful shutdown. <-stop - fmt.Println("Shutting down server...") + timer := time.AfterFunc(10*time.Second, func() { // forceful stop after 10 seconds + log.Printf("Graceful shutdown did not complete within 10 seconds. Forcing shutdown...") + s.Stop() + }) + defer timer.Stop() s.GracefulStop() - close(stop) }() if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } - - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - - select { - case <-stop: - log.Printf("Server stopped gracefully") - case <-ctx.Done(): - log.Printf("Graceful stop timeout reached. Forcing server stop.") - s.Stop() // Forceful stop - } } From d48bd1fc6ba28148796278bd8301701b1746355b Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Sat, 23 Nov 2024 23:20:27 +0530 Subject: [PATCH 03/10] use ServerStreaming scenario to demonstrate graceful shutdown --- examples/features/gracefulstop/README.md | 30 ++++++++-------- examples/features/gracefulstop/client/main.go | 36 ++++++++++--------- examples/features/gracefulstop/server/main.go | 3 -- 3 files changed, 35 insertions(+), 34 deletions(-) diff --git a/examples/features/gracefulstop/README.md b/examples/features/gracefulstop/README.md index a336982ff329..0f46ff2cfc7c 100644 --- a/examples/features/gracefulstop/README.md +++ b/examples/features/gracefulstop/README.md @@ -1,31 +1,33 @@ # Graceful Stop This example demonstrates how to gracefully stop a gRPC server using -Server.GracefulStop(). +`Server.GracefulStop()`. ## How to run -Start the server which will listen to incoming gRPC requests as well as OS -interrupt signals (SIGINT/SIGTERM). After receiving interrupt signal, it calls -`s.GracefulStop()` to gracefully shut down the server. If graceful shutdown -doesn't happen in time, server is stopped forcefully. +Start the server which will serve `ServerStreaming` gRPC requests and listen to +an OS interrupt signal (SIGINT/SIGTERM). `ServerStreaming` handler returns an +error after sending 5 messages to the client or if client's context is +canceled. If an interrupt signal is received, it calls `s.GracefulStop()` to +gracefully shut down the server to make sure in-flight RPC is finished before +shutting down. If graceful shutdown doesn't happen in time, the server is +stopped forcefully. ```sh $ go run server/main.go ``` -In a separate terminal, start the client which will send multiple requests to -the server with some delay between each request. +In a separate terminal, start the client which will start a server stream to +the server and keep receiving messages until an error is received. Once an +error is received, it closes the stream. ```sh $ go run client/main.go ``` -Use Ctrl+C or SIGTERM to signal the server to shut down. - -The server begins a graceful stop: -- It finishes ongoing requests. -- Rejects new incoming requests. - -The client will notice the server's shutdown when a request fails. +Once the client starts receiving messages from server, use Ctrl+C or SIGTERM to +signal the server to shut down. +The server begins a graceful stop. It finish the in-flight request. In this +case, client will receive 5 messages followed by the stream failure from the +server, allowing client to close the stream gracefully, before shutting down. diff --git a/examples/features/gracefulstop/client/main.go b/examples/features/gracefulstop/client/main.go index 4ce401513200..f3316abd1257 100644 --- a/examples/features/gracefulstop/client/main.go +++ b/examples/features/gracefulstop/client/main.go @@ -26,10 +26,8 @@ import ( "time" "google.golang.org/grpc" - "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" - pb "google.golang.org/grpc/examples/helloworld/helloworld" - "google.golang.org/grpc/status" + pb "google.golang.org/grpc/examples/features/proto/echo" ) var addr = flag.String("addr", "localhost:50052", "the address to connect to") @@ -37,30 +35,34 @@ var addr = flag.String("addr", "localhost:50052", "the address to connect to") func main() { flag.Parse() - // Set up a connection to the server. conn, err := grpc.NewClient(*addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - log.Fatalf("Failed to connect: %v", err) + log.Fatalf("Failed to create new client: %v", err) } defer conn.Close() - c := pb.NewGreeterClient(conn) + c := pb.NewEchoClient(conn) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() - for i := 1; i <= 10; i++ { - log.Printf("Calling SayHello %d time", i) - r, err := c.SayHello(ctx, &pb.HelloRequest{}) + stream, err := c.ServerStreamingEcho(ctx, &pb.EchoRequest{}) + if err != nil { + log.Fatalf("Error starting stream: %v", err) + } + + for { + r, err := stream.Recv() if err != nil { - if status.Code(err) != codes.InvalidArgument { - log.Printf("Received unexpected error: %v", err) - continue + // Handle the error and close the stream gracefully + log.Printf("Error sending request: %v\n", err) + err := stream.CloseSend() + if err != nil { + log.Fatalf("Error closing stream: %v", err) } - log.Printf("Received error: %v", err) - continue + log.Println("Stream closed gracefully") + break } - log.Printf("Received response: %s", r.Message) - time.Sleep(time.Second) + log.Printf(r.Message) } log.Printf("Client finished interaction with server.") diff --git a/examples/features/gracefulstop/server/main.go b/examples/features/gracefulstop/server/main.go index 8c3f5bb9ebbb..004d498f2ac3 100644 --- a/examples/features/gracefulstop/server/main.go +++ b/examples/features/gracefulstop/server/main.go @@ -60,15 +60,12 @@ func (s *server) ServerStreamingEcho(_ *pb.EchoRequest, stream pb.Echo_ServerStr atomic.AddInt32(&streamMessages, 1) mu.Lock() - if streamMessages > 5 { return fmt.Errorf("request failed") } - if err := stream.Send(&pb.EchoResponse{Message: fmt.Sprintf("Messages Sent: %d", streamMessages)}); err != nil { return err } - mu.Unlock() time.Sleep(1 * time.Second) From 9e46942a856c664595eeb7223d4f10194ea24caa Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Wed, 27 Nov 2024 18:54:36 +0530 Subject: [PATCH 04/10] remove need ctrl+c --- examples/features/gracefulstop/README.md | 26 ++++----- examples/features/gracefulstop/client/main.go | 21 ++++---- examples/features/gracefulstop/server/main.go | 53 ++++++------------- 3 files changed, 37 insertions(+), 63 deletions(-) diff --git a/examples/features/gracefulstop/README.md b/examples/features/gracefulstop/README.md index 0f46ff2cfc7c..76ad05e91310 100644 --- a/examples/features/gracefulstop/README.md +++ b/examples/features/gracefulstop/README.md @@ -5,29 +5,25 @@ This example demonstrates how to gracefully stop a gRPC server using ## How to run -Start the server which will serve `ServerStreaming` gRPC requests and listen to -an OS interrupt signal (SIGINT/SIGTERM). `ServerStreaming` handler returns an -error after sending 5 messages to the client or if client's context is -canceled. If an interrupt signal is received, it calls `s.GracefulStop()` to -gracefully shut down the server to make sure in-flight RPC is finished before -shutting down. If graceful shutdown doesn't happen in time, the server is -stopped forcefully. +Start the server which will serve `ServerStreaming` gRPC requests. It spawns +a go routine before starting the server that waits for signal from handler that +server stream has started and initiates `Server.GracefulStop()`. Therefore, +server will stop only after the in-flight rpc is finished. ```sh $ go run server/main.go ``` In a separate terminal, start the client which will start a server stream to -the server and keep receiving messages until an error is received. Once an -error is received, it closes the stream. +the server and wait to receive 5 messages before closing the stream. +`ServerStreaming` handler signals the server to initiate graceful shutdown and +start sending stream of messages to client indefinitely until client closes the +stream. ```sh $ go run client/main.go ``` -Once the client starts receiving messages from server, use Ctrl+C or SIGTERM to -signal the server to shut down. - -The server begins a graceful stop. It finish the in-flight request. In this -case, client will receive 5 messages followed by the stream failure from the -server, allowing client to close the stream gracefully, before shutting down. +Once the client client finish receiving 5 messages from server, it sends +`stream.CloseSend()` to close the stream which finishes the in-flight request. +The server then gracefully stop. diff --git a/examples/features/gracefulstop/client/main.go b/examples/features/gracefulstop/client/main.go index f3316abd1257..6a775bdfde55 100644 --- a/examples/features/gracefulstop/client/main.go +++ b/examples/features/gracefulstop/client/main.go @@ -45,25 +45,22 @@ func main() { ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() + // Start a server stream and receive 5 messages before closing the stream. + // This will initiate the graceful stop on the server. stream, err := c.ServerStreamingEcho(ctx, &pb.EchoRequest{}) if err != nil { log.Fatalf("Error starting stream: %v", err) } - - for { + // Server must complete the in-flight streaming RPC so client should + // receive 5 messages before stopping. + for i := 0; i < 5; i++ { r, err := stream.Recv() if err != nil { - // Handle the error and close the stream gracefully - log.Printf("Error sending request: %v\n", err) - err := stream.CloseSend() - if err != nil { - log.Fatalf("Error closing stream: %v", err) - } - log.Println("Stream closed gracefully") - break + log.Fatalf("Error receiving message: %v", err) } + time.Sleep(10 * time.Millisecond) log.Printf(r.Message) } - - log.Printf("Client finished interaction with server.") + stream.CloseSend() + log.Printf("Client finished streaming.") } diff --git a/examples/features/gracefulstop/server/main.go b/examples/features/gracefulstop/server/main.go index 004d498f2ac3..445fb7124763 100644 --- a/examples/features/gracefulstop/server/main.go +++ b/examples/features/gracefulstop/server/main.go @@ -23,12 +23,8 @@ import ( "fmt" "log" "net" - "os" - "os/signal" "sync" "sync/atomic" - "syscall" - "time" "google.golang.org/grpc" pb "google.golang.org/grpc/examples/features/proto/echo" @@ -38,6 +34,7 @@ var ( port = flag.Int("port", 50052, "port number") streamMessages int32 mu sync.Mutex + streamStart chan struct{} // to signal if server streaming started ) type server struct { @@ -45,35 +42,27 @@ type server struct { } // ServerStreamingEcho implements the EchoService.ServerStreamingEcho method. -// It receives an EchoRequest and sends back a stream of EchoResponses. -// The stream will contain up to 5 messages, each sent after a 1-second delay. -// If the client cancels the request or if more than 5 messages are sent, -// the stream will be closed with an error. +// It receives an EchoRequest and sends back a stream of EchoResponses until an +// error occurs or the stream is closed. func (s *server) ServerStreamingEcho(_ *pb.EchoRequest, stream pb.Echo_ServerStreamingEchoServer) error { - ctx := stream.Context() + // Signal streaming start to initiate graceful stop which should wait until + // server streaming finishes. + streamStart <- struct{}{} for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - atomic.AddInt32(&streamMessages, 1) + atomic.AddInt32(&streamMessages, 1) - mu.Lock() - if streamMessages > 5 { - return fmt.Errorf("request failed") - } - if err := stream.Send(&pb.EchoResponse{Message: fmt.Sprintf("Messages Sent: %d", streamMessages)}); err != nil { - return err - } - mu.Unlock() - - time.Sleep(1 * time.Second) + mu.Lock() + if err := stream.Send(&pb.EchoResponse{Message: fmt.Sprintf("Messages Sent: %d", streamMessages)}); err != nil { + log.Printf("Stream is closed: %v. Stop Streaming", err) + return err } + mu.Unlock() } } func main() { + streamStart = make(chan struct{}) flag.Parse() address := fmt.Sprintf(":%v", *port) @@ -82,22 +71,14 @@ func main() { log.Fatalf("failed to listen: %v", err) } - // Create a channel to listen for OS signals. - stop := make(chan os.Signal, 1) - signal.Notify(stop, os.Interrupt, syscall.SIGTERM) - s := grpc.NewServer() pb.RegisterEchoServer(s, &server{}) go func() { - // Wait for an OS signal for graceful shutdown. - <-stop - timer := time.AfterFunc(10*time.Second, func() { // forceful stop after 10 seconds - log.Printf("Graceful shutdown did not complete within 10 seconds. Forcing shutdown...") - s.Stop() - }) - defer timer.Stop() - s.GracefulStop() + <-streamStart // wait until server streaming starts + log.Println("Initiating graceful shutdown...") + s.GracefulStop() // gracefully stop server after in-flight server streaming rpc finishes + log.Println("Server stopped gracefully.") }() if err := s.Serve(lis); err != nil { From d71a54b468dfd8483a5edaad8283e6d4baab47df Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Thu, 28 Nov 2024 01:23:31 +0530 Subject: [PATCH 05/10] Unary and Client Stream to demo graceful stop --- examples/features/gracefulstop/client/main.go | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/examples/features/gracefulstop/client/main.go b/examples/features/gracefulstop/client/main.go index 6a775bdfde55..75bfbb4a996e 100644 --- a/examples/features/gracefulstop/client/main.go +++ b/examples/features/gracefulstop/client/main.go @@ -45,22 +45,27 @@ func main() { ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() - // Start a server stream and receive 5 messages before closing the stream. - // This will initiate the graceful stop on the server. - stream, err := c.ServerStreamingEcho(ctx, &pb.EchoRequest{}) + // Start a client stream and keep calling the `c.UnaryEcho` until receiving + // an error. Error will indicate that server graceful stop is initiated and + // it won't accept any new requests. + stream, err := c.ClientStreamingEcho(ctx) if err != nil { log.Fatalf("Error starting stream: %v", err) } - // Server must complete the in-flight streaming RPC so client should - // receive 5 messages before stopping. - for i := 0; i < 5; i++ { - r, err := stream.Recv() + + for { + r, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: "Hello"}) if err != nil { - log.Fatalf("Error receiving message: %v", err) + log.Printf("Error calling `UnaryEcho`. Server graceful stop initiated: %v", err) + break } time.Sleep(10 * time.Millisecond) log.Printf(r.Message) } - stream.CloseSend() - log.Printf("Client finished streaming.") + + r, err := stream.CloseAndRecv() + if err != nil { + log.Fatalf("Error closing stream: %v", err) + } + log.Printf(r.Message) } From a7d55e8db153db2cff38292e95385944877204f9 Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Thu, 28 Nov 2024 01:25:20 +0530 Subject: [PATCH 06/10] Unary and Client Stream to demo graceful stop --- examples/features/gracefulstop/README.md | 29 ++++++---- examples/features/gracefulstop/server/main.go | 55 ++++++++++++------- 2 files changed, 51 insertions(+), 33 deletions(-) diff --git a/examples/features/gracefulstop/README.md b/examples/features/gracefulstop/README.md index 76ad05e91310..3d2baea125ce 100644 --- a/examples/features/gracefulstop/README.md +++ b/examples/features/gracefulstop/README.md @@ -5,25 +5,30 @@ This example demonstrates how to gracefully stop a gRPC server using ## How to run -Start the server which will serve `ServerStreaming` gRPC requests. It spawns -a go routine before starting the server that waits for signal from handler that -server stream has started and initiates `Server.GracefulStop()`. Therefore, -server will stop only after the in-flight rpc is finished. +Start the server with a client streaming and unary request handler. When client +streaming is started, client streaming handler signals the server to initiate +graceful stop and waits for the stream to be closed or aborted. Until the +`Server.GracefulStop()` is initiated, server will continue to accept unary +requests. Once `Server.GracefulStop()` is initiated, server will not accept +new unary requests. ```sh $ go run server/main.go ``` -In a separate terminal, start the client which will start a server stream to -the server and wait to receive 5 messages before closing the stream. -`ServerStreaming` handler signals the server to initiate graceful shutdown and -start sending stream of messages to client indefinitely until client closes the -stream. +In a separate terminal, start the client which will start the client stream to +the server and starts making unary requests until receiving an error. Error +will indicate that the server graceful shutdown is initiated so client will +stop making further unary requests and close the client stream. ```sh $ go run client/main.go ``` -Once the client client finish receiving 5 messages from server, it sends -`stream.CloseSend()` to close the stream which finishes the in-flight request. -The server then gracefully stop. +As part of unary requests server will keep track of number of unary requests +processed. As part of unary response it returns that number and once the client +has successfully closed the stream, it returns the total number of unary +requests processed as response. The number from stream response will be equal +to the number from last unary response. This indicates that server has +processed all in-flight requests before shutting down. + diff --git a/examples/features/gracefulstop/server/main.go b/examples/features/gracefulstop/server/main.go index 445fb7124763..788471d95c4e 100644 --- a/examples/features/gracefulstop/server/main.go +++ b/examples/features/gracefulstop/server/main.go @@ -19,50 +19,61 @@ package main import ( + "context" + "errors" "flag" "fmt" + "io" "log" "net" - "sync" "sync/atomic" + "time" "google.golang.org/grpc" pb "google.golang.org/grpc/examples/features/proto/echo" ) var ( - port = flag.Int("port", 50052, "port number") - streamMessages int32 - mu sync.Mutex - streamStart chan struct{} // to signal if server streaming started + port = flag.Int("port", 50052, "port number") ) type server struct { pb.UnimplementedEchoServer + + unaryRequests int32 // to track number of unary RPCs processed + streamStart chan struct{} // to signal if server streaming started } -// ServerStreamingEcho implements the EchoService.ServerStreamingEcho method. -// It receives an EchoRequest and sends back a stream of EchoResponses until an -// error occurs or the stream is closed. -func (s *server) ServerStreamingEcho(_ *pb.EchoRequest, stream pb.Echo_ServerStreamingEchoServer) error { +// ClientStreamingEcho implements the EchoService.ClientStreamingEcho method. +// It signals the server that streaming has started and waits for the stream to +// be done or aborted. If `io.EOF` is received on stream that means client +// has successfully closed the stream using `stream.CloseAndRecv()`, so it +// returns an `EchoResponse` with the total number of unary RPCs processed +// otherwise, it returns the error indicating stream is aborted. +func (s *server) ClientStreamingEcho(stream pb.Echo_ClientStreamingEchoServer) error { // Signal streaming start to initiate graceful stop which should wait until // server streaming finishes. - streamStart <- struct{}{} - - for { - atomic.AddInt32(&streamMessages, 1) + s.streamStart <- struct{}{} - mu.Lock() - if err := stream.Send(&pb.EchoResponse{Message: fmt.Sprintf("Messages Sent: %d", streamMessages)}); err != nil { - log.Printf("Stream is closed: %v. Stop Streaming", err) - return err + if err := stream.RecvMsg(&pb.EchoResponse{}); err != nil { + if errors.Is(err, io.EOF) { + stream.SendAndClose(&pb.EchoResponse{Message: fmt.Sprintf("Total Unary Requests Processed: %d", s.unaryRequests)}) + return nil } - mu.Unlock() + return err } + + return nil +} + +// UnaryEcho implements the EchoService.UnaryEcho method. It increments +// `s.unaryRequests` on every call and returns it as part of `EchoResponse`. +func (s *server) UnaryEcho(_ context.Context, _ *pb.EchoRequest) (*pb.EchoResponse, error) { + atomic.AddInt32(&s.unaryRequests, 1) + return &pb.EchoResponse{Message: fmt.Sprintf("Request Processed: %d", s.unaryRequests)}, nil } func main() { - streamStart = make(chan struct{}) flag.Parse() address := fmt.Sprintf(":%v", *port) @@ -72,10 +83,12 @@ func main() { } s := grpc.NewServer() - pb.RegisterEchoServer(s, &server{}) + ss := &server{streamStart: make(chan struct{})} + pb.RegisterEchoServer(s, ss) go func() { - <-streamStart // wait until server streaming starts + <-ss.streamStart // wait until server streaming starts + time.Sleep(1 * time.Second) log.Println("Initiating graceful shutdown...") s.GracefulStop() // gracefully stop server after in-flight server streaming rpc finishes log.Println("Server stopped gracefully.") From f4424d042a0cd6a6d6532181f6d634ceb2d467e0 Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Thu, 28 Nov 2024 22:31:51 +0530 Subject: [PATCH 07/10] address structure comments --- examples/features/gracefulstop/client/main.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/examples/features/gracefulstop/client/main.go b/examples/features/gracefulstop/client/main.go index 75bfbb4a996e..932ca52e7d99 100644 --- a/examples/features/gracefulstop/client/main.go +++ b/examples/features/gracefulstop/client/main.go @@ -22,6 +22,7 @@ package main import ( "context" "flag" + "fmt" "log" "time" @@ -53,13 +54,17 @@ func main() { log.Fatalf("Error starting stream: %v", err) } + // Keep track of successful unary requests which can be compared later to + // the successful unary requests reported by the server. + unaryRequests := 0 for { r, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: "Hello"}) if err != nil { log.Printf("Error calling `UnaryEcho`. Server graceful stop initiated: %v", err) break } - time.Sleep(10 * time.Millisecond) + unaryRequests++ + time.Sleep(200 * time.Millisecond) log.Printf(r.Message) } @@ -67,5 +72,8 @@ func main() { if err != nil { log.Fatalf("Error closing stream: %v", err) } - log.Printf(r.Message) + if fmt.Sprintf("%d", unaryRequests) != r.Message { + log.Fatalf("Got %s successfull unary requests processed from server, want: %d", r.Message, unaryRequests) + } + log.Printf("Successful unary requests processed by server and made by client are same: %d", unaryRequests) } From 2ddb80e03bd4fe286c3b244b3d81b305f6769e20 Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Thu, 28 Nov 2024 22:33:42 +0530 Subject: [PATCH 08/10] address structure comments --- examples/features/gracefulstop/README.md | 57 +++++++++++-------- examples/features/gracefulstop/client/main.go | 2 +- examples/features/gracefulstop/server/main.go | 15 +++-- 3 files changed, 45 insertions(+), 29 deletions(-) diff --git a/examples/features/gracefulstop/README.md b/examples/features/gracefulstop/README.md index 3d2baea125ce..b9a7eff07ec2 100644 --- a/examples/features/gracefulstop/README.md +++ b/examples/features/gracefulstop/README.md @@ -1,34 +1,45 @@ # Graceful Stop This example demonstrates how to gracefully stop a gRPC server using -`Server.GracefulStop()`. +`Server.GracefulStop()`. The graceful shutdown process involves two key steps: -## How to run +- Initiate `Server.GracefulStop()`. This function blocks until all currently + running RPCs have completed. This ensures that in-flight requests are + allowed to finish processing. -Start the server with a client streaming and unary request handler. When client -streaming is started, client streaming handler signals the server to initiate -graceful stop and waits for the stream to be closed or aborted. Until the -`Server.GracefulStop()` is initiated, server will continue to accept unary -requests. Once `Server.GracefulStop()` is initiated, server will not accept -new unary requests. +- It's crucial to call `Server.Stop()` with a timeout before calling + `GracefulStop()`. This acts as a safety net, ensuring that the server + eventually shuts down even if some in-flight RPCs don't complete within a + reasonable timeframe. This prevents indefinite blocking. -```sh -$ go run server/main.go -``` +## Try it -In a separate terminal, start the client which will start the client stream to -the server and starts making unary requests until receiving an error. Error -will indicate that the server graceful shutdown is initiated so client will -stop making further unary requests and close the client stream. +``` +go run server/main.go +``` -```sh -$ go run client/main.go ``` +go run client/main.go +``` + +## Explanation + +The server starts with a client streaming and unary request handler. When +client streaming is started, client streaming handler signals the server to +initiate graceful stop and waits for the stream to be closed or aborted. Until +the`Server.GracefulStop()` is initiated, server will continue to accept unary +requests. Once `Server.GracefulStop()` is initiated, server will not accept +new unary requests. -As part of unary requests server will keep track of number of unary requests -processed. As part of unary response it returns that number and once the client -has successfully closed the stream, it returns the total number of unary -requests processed as response. The number from stream response will be equal -to the number from last unary response. This indicates that server has -processed all in-flight requests before shutting down. +Client will start the client stream to the server and starts making unary +requests until receiving an error. Error will indicate that the server graceful +shutdown is initiated so client will stop making further unary requests and +closes the client stream. + +Server and client will keep track of number of unary requests processed on +their side. Once the client has successfully closed the stream, server returns +the total number of unary requests processed as response. The number from +stream response should be equal to the number of unary requests tracked by +client. This indicates that server has processed all in-flight requests before +shutting down. diff --git a/examples/features/gracefulstop/client/main.go b/examples/features/gracefulstop/client/main.go index 932ca52e7d99..acf2963bbbd8 100644 --- a/examples/features/gracefulstop/client/main.go +++ b/examples/features/gracefulstop/client/main.go @@ -73,7 +73,7 @@ func main() { log.Fatalf("Error closing stream: %v", err) } if fmt.Sprintf("%d", unaryRequests) != r.Message { - log.Fatalf("Got %s successfull unary requests processed from server, want: %d", r.Message, unaryRequests) + log.Fatalf("Got %s successful unary requests processed from server, want: %d", r.Message, unaryRequests) } log.Printf("Successful unary requests processed by server and made by client are same: %d", unaryRequests) } diff --git a/examples/features/gracefulstop/server/main.go b/examples/features/gracefulstop/server/main.go index 788471d95c4e..7b9233f82fe3 100644 --- a/examples/features/gracefulstop/server/main.go +++ b/examples/features/gracefulstop/server/main.go @@ -40,7 +40,7 @@ var ( type server struct { pb.UnimplementedEchoServer - unaryRequests int32 // to track number of unary RPCs processed + unaryRequests atomic.Int32 // to track number of unary RPCs processed streamStart chan struct{} // to signal if server streaming started } @@ -57,7 +57,7 @@ func (s *server) ClientStreamingEcho(stream pb.Echo_ClientStreamingEchoServer) e if err := stream.RecvMsg(&pb.EchoResponse{}); err != nil { if errors.Is(err, io.EOF) { - stream.SendAndClose(&pb.EchoResponse{Message: fmt.Sprintf("Total Unary Requests Processed: %d", s.unaryRequests)}) + stream.SendAndClose(&pb.EchoResponse{Message: fmt.Sprintf("%d", s.unaryRequests.Load())}) return nil } return err @@ -68,9 +68,9 @@ func (s *server) ClientStreamingEcho(stream pb.Echo_ClientStreamingEchoServer) e // UnaryEcho implements the EchoService.UnaryEcho method. It increments // `s.unaryRequests` on every call and returns it as part of `EchoResponse`. -func (s *server) UnaryEcho(_ context.Context, _ *pb.EchoRequest) (*pb.EchoResponse, error) { - atomic.AddInt32(&s.unaryRequests, 1) - return &pb.EchoResponse{Message: fmt.Sprintf("Request Processed: %d", s.unaryRequests)}, nil +func (s *server) UnaryEcho(_ context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) { + s.unaryRequests.Add(1) + return &pb.EchoResponse{Message: req.Message}, nil } func main() { @@ -90,6 +90,11 @@ func main() { <-ss.streamStart // wait until server streaming starts time.Sleep(1 * time.Second) log.Println("Initiating graceful shutdown...") + timer := time.AfterFunc(10*time.Second, func() { + log.Println("Server couldn't stop gracefully in time. Doing force stop.") + s.Stop() + }) + defer timer.Stop() s.GracefulStop() // gracefully stop server after in-flight server streaming rpc finishes log.Println("Server stopped gracefully.") }() From c8e4d3a3caa1dd8a9eaacf3ef24573e3e42999ba Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Tue, 3 Dec 2024 09:42:26 +0530 Subject: [PATCH 09/10] add features/gracefulstop to example_test.sh --- examples/examples_test.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/examples_test.sh b/examples/examples_test.sh index ef0d34769482..5e7c1985b896 100755 --- a/examples/examples_test.sh +++ b/examples/examples_test.sh @@ -71,6 +71,7 @@ EXAMPLES=( "features/orca" "features/retry" "features/unix_abstract" + "features/gracefulstop" ) declare -A SERVER_ARGS=( From 0b880da8e754749d8f95608df62ca1cbaa2c7c67 Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Thu, 5 Dec 2024 22:36:43 +0530 Subject: [PATCH 10/10] add client and server expectation outputs --- examples/examples_test.sh | 2 ++ examples/features/gracefulstop/client/main.go | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/examples/examples_test.sh b/examples/examples_test.sh index 5e7c1985b896..28f050c0a0e8 100755 --- a/examples/examples_test.sh +++ b/examples/examples_test.sh @@ -130,6 +130,7 @@ declare -A EXPECTED_SERVER_OUTPUT=( ["features/retry"]="request succeeded count: 4" ["features/unix_abstract"]="serving on @abstract-unix-socket" ["features/advancedtls"]="" + ["features/gracefulstop"]="Server stopped gracefully." ) declare -A EXPECTED_CLIENT_OUTPUT=( @@ -155,6 +156,7 @@ declare -A EXPECTED_CLIENT_OUTPUT=( ["features/retry"]="UnaryEcho reply: message:\"Try and Success\"" ["features/unix_abstract"]="calling echo.Echo/UnaryEcho to unix-abstract:abstract-unix-socket" ["features/advancedtls"]="" + ["features/gracefulstop"]="Successful unary requests processed by server and made by client are same." ) cd ./examples diff --git a/examples/features/gracefulstop/client/main.go b/examples/features/gracefulstop/client/main.go index acf2963bbbd8..61f5874b3940 100644 --- a/examples/features/gracefulstop/client/main.go +++ b/examples/features/gracefulstop/client/main.go @@ -67,6 +67,7 @@ func main() { time.Sleep(200 * time.Millisecond) log.Printf(r.Message) } + log.Printf("Successful unary requests made by client: %d", unaryRequests) r, err := stream.CloseAndRecv() if err != nil { @@ -75,5 +76,5 @@ func main() { if fmt.Sprintf("%d", unaryRequests) != r.Message { log.Fatalf("Got %s successful unary requests processed from server, want: %d", r.Message, unaryRequests) } - log.Printf("Successful unary requests processed by server and made by client are same: %d", unaryRequests) + log.Printf("Successful unary requests processed by server and made by client are same.") }