Skip to content

Commit

Permalink
Replace gRPC port when resolving host address (#4174)
Browse files Browse the repository at this point in the history
  • Loading branch information
vytautas-karpavicius committed May 4, 2021
1 parent 4a71b12 commit bc11811
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 40 deletions.
43 changes: 20 additions & 23 deletions client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,32 @@ func (cf *rpcClientFactory) NewFrontendClient() (frontend.Client, error) {
return cf.NewFrontendClientWithTimeout(frontend.DefaultTimeout, frontend.DefaultLongPollTimeout)
}

func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) (history.Client, error) {
resolver, err := cf.monitor.GetResolver(common.HistoryServiceName)
func (cf *rpcClientFactory) createKeyResolver(serviceName string) (func(key string) (string, error), error) {
resolver, err := cf.monitor.GetResolver(serviceName)
if err != nil {
return nil, err
}

keyResolver := func(key string) (string, error) {
return func(key string) (string, error) {
host, err := resolver.Lookup(key)
if err != nil {
return "", err
}
return host.GetAddress(), nil
hostAddress := host.GetAddress()
if cf.enableGRPCOutbound {
hostAddress, err = cf.rpcFactory.ReplaceGRPCPort(serviceName, hostAddress)
if err != nil {
return "", err
}
}
return hostAddress, nil
}, nil
}

func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) (history.Client, error) {
keyResolver, err := cf.createKeyResolver(common.HistoryServiceName)
if err != nil {
return nil, err
}

clientProvider := func(clientKey string) (interface{}, error) {
Expand Down Expand Up @@ -159,19 +173,11 @@ func (cf *rpcClientFactory) NewMatchingClientWithTimeout(
timeout time.Duration,
longPollTimeout time.Duration,
) (matching.Client, error) {
resolver, err := cf.monitor.GetResolver(common.MatchingServiceName)
keyResolver, err := cf.createKeyResolver(common.MatchingServiceName)
if err != nil {
return nil, err
}

keyResolver := func(key string) (string, error) {
host, err := resolver.Lookup(key)
if err != nil {
return "", err
}
return host.GetAddress(), nil
}

clientProvider := func(clientKey string) (interface{}, error) {
if cf.enableGRPCOutbound {
return cf.newMatchingGRPCClient(clientKey)
Expand Down Expand Up @@ -199,20 +205,11 @@ func (cf *rpcClientFactory) NewFrontendClientWithTimeout(
timeout time.Duration,
longPollTimeout time.Duration,
) (frontend.Client, error) {

resolver, err := cf.monitor.GetResolver(common.FrontendServiceName)
keyResolver, err := cf.createKeyResolver(common.FrontendServiceName)
if err != nil {
return nil, err
}

keyResolver := func(key string) (string, error) {
host, err := resolver.Lookup(key)
if err != nil {
return "", err
}
return host.GetAddress(), nil
}

clientProvider := func(clientKey string) (interface{}, error) {
if cf.enableGRPCOutbound {
return cf.newFrontendGRPCClient(clientKey)
Expand Down
12 changes: 6 additions & 6 deletions common/config/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ func (d *RPCFactory) CreateGRPCDispatcherForOutbound(
serviceName string,
hostName string,
) (*yarpc.Dispatcher, error) {
grpcAddress, err := d.grpcPorts.GetGRPCAddress(serviceName, hostName)
if err != nil {
d.logger.Error("Failed to create GRPC outbound dispatcher", tag.Error(err))
return nil, err
}
return d.createOutboundDispatcher(callerName, serviceName, grpcAddress, d.grpc.NewSingleOutbound(grpcAddress))
return d.createOutboundDispatcher(callerName, serviceName, hostName, d.grpc.NewSingleOutbound(hostName))
}

// ReplaceGRPCPort replaces port in the address to grpc for a given service
func (d *RPCFactory) ReplaceGRPCPort(serviceName, hostAddress string) (string, error) {
return d.grpcPorts.GetGRPCAddress(serviceName, hostAddress)
}

func (d *RPCFactory) createOutboundDispatcher(
Expand Down
1 change: 1 addition & 0 deletions common/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type (
GetDispatcher() *yarpc.Dispatcher
CreateDispatcherForOutbound(callerName, serviceName, hostName string) (*yarpc.Dispatcher, error)
CreateGRPCDispatcherForOutbound(callerName, serviceName, hostName string) (*yarpc.Dispatcher, error)
ReplaceGRPCPort(serviceName, hostAddress string) (string, error)
}
)

Expand Down
16 changes: 5 additions & 11 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ func (c *rpcFactoryImpl) createDispatcher() *yarpc.Dispatcher {
}
inbounds = append(inbounds, c.ch.NewInbound())

grpcHostPort, err := getGRPCAddress(c.hostPort)
grpcHostPort, err := c.ReplaceGRPCPort("", c.hostPort)
if err != nil {
c.logger.Fatal("Failed to obtain GRPC address", tag.Error(err))
}
Expand Down Expand Up @@ -880,17 +880,12 @@ func (c *rpcFactoryImpl) CreateDispatcherForOutbound(

func (c *rpcFactoryImpl) CreateGRPCDispatcherForOutbound(
callerName, serviceName, hostName string) (*yarpc.Dispatcher, error) {
grpcAddress, err := getGRPCAddress(hostName)
if err != nil {
c.logger.Error("Failed to obtain GRPC address", tag.Error(err))
return nil, err
}

// Setup dispatcher(outbound) for onebox
d := yarpc.NewDispatcher(yarpc.Config{
Name: callerName,
Outbounds: yarpc.Outbounds{
serviceName: {Unary: c.grpc.NewSingleOutbound(grpcAddress)},
serviceName: {Unary: c.grpc.NewSingleOutbound(hostName)},
},
})
if err := d.Start(); err != nil {
Expand All @@ -900,10 +895,9 @@ func (c *rpcFactoryImpl) CreateGRPCDispatcherForOutbound(
return d, nil
}

const gprcPortOffset = 10
const grpcPortOffset = 10

func getGRPCAddress(hostPort string) (string, error) {
fmt.Println()
func (c *rpcFactoryImpl) ReplaceGRPCPort(_, hostPort string) (string, error) {
host, port, err := net.SplitHostPort(hostPort)
if err != nil {
return "", err
Expand All @@ -914,6 +908,6 @@ func getGRPCAddress(hostPort string) (string, error) {
return "", err
}

grpcAddress := net.JoinHostPort(host, strconv.Itoa(portInt+gprcPortOffset))
grpcAddress := net.JoinHostPort(host, strconv.Itoa(portInt+grpcPortOffset))
return grpcAddress, nil
}

0 comments on commit bc11811

Please sign in to comment.