Skip to content

Commit

Permalink
Expose underlying gRPC client and server objects (#311)
Browse files Browse the repository at this point in the history
* Expose underlying gRPC client and server objects

Fixes #204

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Fixed reported race conditions

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Support for Go 1.17

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
  • Loading branch information
ItalyPaleAle authored Sep 30, 2022
1 parent f182441 commit cfd7483
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 11 deletions.
9 changes: 6 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,23 +300,21 @@ type GRPCClient struct {
ctxCancelFunc context.CancelFunc
protoClient pb.DaprClient
authToken string
mux sync.Mutex
}

// Close cleans up all resources created by the client.
func (c *GRPCClient) Close() {
c.ctxCancelFunc()
if c.connection != nil {
c.connection.Close()
c.connection = nil
}
}

// WithAuthToken sets Dapr API token on the instantiated client.
// Allows empty string to reset token on existing client.
func (c *GRPCClient) WithAuthToken(token string) {
c.mux.Lock()
c.authToken = token
c.mux.Unlock()
}

// WithTraceID adds existing trace ID to the outgoing context.
Expand Down Expand Up @@ -349,3 +347,8 @@ func (c *GRPCClient) Shutdown(ctx context.Context) error {
func (c *GRPCClient) GrpcClient() pb.DaprClient {
return c.protoClient
}

// GrpcClientConn returns the grpc.ClientConn object used by this client.
func (c *GRPCClient) GrpcClientConn() *grpc.ClientConn {
return c.connection
}
39 changes: 31 additions & 8 deletions service/grpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package grpc
import (
"net"
"os"
"sync/atomic"

"github.com/pkg/errors"
"google.golang.org/grpc"
Expand Down Expand Up @@ -48,13 +49,20 @@ func NewServiceWithListener(lis net.Listener) common.Service {
}

func newService(lis net.Listener) *Server {
return &Server{
s := &Server{
listener: lis,
invokeHandlers: make(map[string]common.ServiceInvocationHandler),
topicRegistrar: make(internal.TopicRegistrar),
bindingHandlers: make(map[string]common.BindingInvocationHandler),
authToken: os.Getenv(common.AppAPITokenEnvVar),
}

gs := grpc.NewServer()
pb.RegisterAppCallbackServer(gs, s)
pb.RegisterAppCallbackHealthCheckServer(gs, s)
s.grpcServer = gs

return s
}

// Server is the gRPC service implementation for Dapr.
Expand All @@ -68,6 +76,7 @@ type Server struct {
healthCheckHandler common.HealthCheckHandler
authToken string
grpcServer *grpc.Server
started uint32
}

func (s *Server) RegisterActorImplFactory(f actor.Factory, opts ...config.Option) {
Expand All @@ -76,19 +85,33 @@ func (s *Server) RegisterActorImplFactory(f actor.Factory, opts ...config.Option

// Start registers the server and starts it.
func (s *Server) Start() error {
gs := grpc.NewServer()
pb.RegisterAppCallbackServer(gs, s)
pb.RegisterAppCallbackHealthCheckServer(gs, s)
s.grpcServer = gs
return gs.Serve(s.listener)
if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
return errors.New("a gRPC server can only be started once")
}
return s.grpcServer.Serve(s.listener)
}

// Stop stops the previously started service.
// Stop stops the previously-started service.
func (s *Server) Stop() error {
return s.listener.Close()
if atomic.LoadUint32(&s.started) == 0 {
return nil
}
s.grpcServer.Stop()
s.grpcServer = nil
return nil
}

// GrecefulStop stops the previously-started service gracefully.
func (s *Server) GracefulStop() error {
if atomic.LoadUint32(&s.started) == 0 {
return nil
}
s.grpcServer.GracefulStop()
s.grpcServer = nil
return nil
}

// GrpcServer returns the grpc.Server object managed by the server.
func (s *Server) GrpcServer() *grpc.Server {
return s.grpcServer
}

0 comments on commit cfd7483

Please sign in to comment.