Skip to content

Commit

Permalink
[Issue 177] add multiple hosts support (#484)
Browse files Browse the repository at this point in the history
Fixes #177 

Master Issue apache/pulsar#3218

### Motivation

add multiple hosts support to go client

### Modifications

- add service uri & service name resolver
- add service name resolver to lookup service & rpc client
- add unit tests
- add integration tests

### Verifying this change

- [ ] Make sure that the change passes the CI checks.
  • Loading branch information
freeznet authored Mar 9, 2021
1 parent 2e1f9c4 commit 5fb7f55
Show file tree
Hide file tree
Showing 13 changed files with 829 additions and 38 deletions.
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ github.com/danieljoos/wincred v1.0.2/go.mod h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3E
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 h1:CaO/zOnF8VvUfEbhRatPcwKVWamvbYd8tQGRWacE9kU=
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1/go.mod h1:+hnT3ywWDTAFrW5aE+u2Sa/wT555ZqwoCS+pk3p6ry4=
github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
Expand Down
6 changes: 4 additions & 2 deletions pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,10 @@ func newClient(options ClientOptions) (Client, error) {
log: logger,
metrics: metrics,
}
c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout, logger, metrics)
c.lookupService = internal.NewLookupService(c.rpcClient, url, tlsConfig != nil, logger, metrics)
serviceNameResolver := internal.NewPulsarServiceNameResolver(url)

c.rpcClient = internal.NewRPCClient(url, serviceNameResolver, c.cnxPool, operationTimeout, logger, metrics)
c.lookupService = internal.NewLookupService(c.rpcClient, url, serviceNameResolver, tlsConfig != nil, logger, metrics)
c.handlers = internal.NewClientHandlers()

return c, nil
Expand Down
56 changes: 56 additions & 0 deletions pulsar/client_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package pulsar

import (
"context"
"crypto/tls"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -458,3 +459,58 @@ func anonymousNamespacePolicy() map[string]interface{} {
"replication_clusters": []string{"standalone"},
}
}

func TestRetryWithMultipleHosts(t *testing.T) {
// Multi hosts included an unreached port and the actual port for verify retry logic
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6600,localhost:6650",
})

assert.Nil(t, err)
defer client.Close()

topic := "persistent://public/default/retry-multiple-hosts-" + generateRandomName()

producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
})

assert.Nil(t, err)
defer producer.Close()

ctx := context.Background()
var msgIDs [][]byte

for i := 0; i < 10; i++ {
if msgID, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
assert.Nil(t, err)
} else {
assert.NotNil(t, msgID)
msgIDs = append(msgIDs, msgID.Serialize())
}
}

assert.Equal(t, 10, len(msgIDs))

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "retry-multi-hosts-sub",
Type: Shared,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.Nil(t, err)
defer consumer.Close()

for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
assert.Nil(t, err)
assert.Contains(t, msgIDs, msg.ID().Serialize())
consumer.Ack(msg)
}

err = consumer.Unsubscribe()
assert.Nil(t, err)

}
4 changes: 4 additions & 0 deletions pulsar/internal/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U
}

if err := cnx.waitUntilReady(); err != nil {
if !wasCached {
p.pool.Delete(key)
p.log.Debug("Removed failed connection from pool:", cnx.logicalAddr, cnx.physicalAddr)
}
return nil, err
}
return cnx, nil
Expand Down
27 changes: 15 additions & 12 deletions pulsar/internal/lookup_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,22 @@ type LookupService interface {
}

type lookupService struct {
rpcClient RPCClient
serviceURL *url.URL
tlsEnabled bool
log log.Logger
metrics *Metrics
rpcClient RPCClient
serviceNameResolver ServiceNameResolver
tlsEnabled bool
log log.Logger
metrics *Metrics
}

// NewLookupService init a lookup service struct and return an object of LookupService.
func NewLookupService(rpcClient RPCClient, serviceURL *url.URL,
func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, serviceNameResolver ServiceNameResolver,
tlsEnabled bool, logger log.Logger, metrics *Metrics) LookupService {
return &lookupService{
rpcClient: rpcClient,
serviceURL: serviceURL,
tlsEnabled: tlsEnabled,
log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
metrics: metrics,
rpcClient: rpcClient,
serviceNameResolver: serviceNameResolver,
tlsEnabled: tlsEnabled,
log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
metrics: metrics,
}
}

Expand All @@ -78,7 +78,10 @@ func (ls *lookupService) getBrokerAddress(lr *pb.CommandLookupTopicResponse) (lo

var physicalAddr *url.URL
if lr.GetProxyThroughServiceUrl() {
physicalAddr = ls.serviceURL
physicalAddr, err = ls.serviceNameResolver.ResolveHost()
if err != nil {
return nil, nil, err
}
} else {
physicalAddr = logicalAddress
}
Expand Down
55 changes: 46 additions & 9 deletions pulsar/internal/lookup_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func responseType(r pb.CommandLookupTopicResponse_LookupType) *pb.CommandLookupT
func TestLookupSuccess(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)
serviceNameResolver := NewPulsarServiceNameResolver(url)

ls := NewLookupService(&mockedLookupRPCClient{
t: t,
Expand All @@ -131,7 +132,7 @@ func TestLookupSuccess(t *testing.T) {
BrokerServiceUrl: proto.String("pulsar://broker-1:6650"),
},
},
}, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -144,6 +145,7 @@ func TestLookupSuccess(t *testing.T) {
func TestTlsLookupSuccess(t *testing.T) {
url, err := url.Parse("pulsar+ssl://example:6651")
assert.NoError(t, err)
serviceNameResolver := NewPulsarServiceNameResolver(url)

ls := NewLookupService(&mockedLookupRPCClient{
t: t,
Expand All @@ -163,7 +165,7 @@ func TestTlsLookupSuccess(t *testing.T) {
BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-1:6651"),
},
},
}, url, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, serviceNameResolver, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -176,6 +178,7 @@ func TestTlsLookupSuccess(t *testing.T) {
func TestLookupWithProxy(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)
serviceNameResolver := NewPulsarServiceNameResolver(url)

ls := NewLookupService(&mockedLookupRPCClient{
t: t,
Expand All @@ -196,7 +199,7 @@ func TestLookupWithProxy(t *testing.T) {
ProxyThroughServiceUrl: proto.Bool(true),
},
},
}, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand Down Expand Up @@ -229,7 +232,7 @@ func TestTlsLookupWithProxy(t *testing.T) {
ProxyThroughServiceUrl: proto.Bool(true),
},
},
}, url, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, NewPulsarServiceNameResolver(url), true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand Down Expand Up @@ -273,7 +276,7 @@ func TestLookupWithRedirect(t *testing.T) {
BrokerServiceUrl: proto.String("pulsar://broker-1:6650"),
},
},
}, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, NewPulsarServiceNameResolver(url), false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand Down Expand Up @@ -317,7 +320,7 @@ func TestTlsLookupWithRedirect(t *testing.T) {
BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-1:6651"),
},
},
}, url, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, NewPulsarServiceNameResolver(url), true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand Down Expand Up @@ -350,7 +353,7 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {
ProxyThroughServiceUrl: proto.Bool(false),
},
},
}, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, NewPulsarServiceNameResolver(url), false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.Error(t, err)
Expand Down Expand Up @@ -378,7 +381,7 @@ func TestLookupWithLookupFailure(t *testing.T) {
Authoritative: proto.Bool(true),
},
},
}, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, NewPulsarServiceNameResolver(url), false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.Error(t, err)
Expand Down Expand Up @@ -447,6 +450,7 @@ func (m mockedPartitionedTopicMetadataRPCClient) RequestOnCnx(cnx Connection, re
func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)
serviceNameResolver := NewPulsarServiceNameResolver(url)

ls := NewLookupService(&mockedPartitionedTopicMetadataRPCClient{
t: t,
Expand All @@ -464,10 +468,43 @@ func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {
Response: pb.CommandPartitionedTopicMetadataResponse_Success.Enum(),
},
},
}, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

metadata, err := ls.GetPartitionedTopicMetadata("my-topic")
assert.NoError(t, err)
assert.NotNil(t, metadata)
assert.Equal(t, metadata.GetPartitions(), uint32(1))
}

func TestLookupSuccessWithMultipleHosts(t *testing.T) {
url, err := url.Parse("pulsar://host1,host2,host3:6650")
assert.NoError(t, err)
serviceNameResolver := NewPulsarServiceNameResolver(url)

ls := NewLookupService(&mockedLookupRPCClient{
t: t,

expectedRequests: []pb.CommandLookupTopic{
{
RequestId: proto.Uint64(1),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
{
RequestId: proto.Uint64(1),
Response: responseType(pb.CommandLookupTopicResponse_Connect),
Authoritative: proto.Bool(true),
BrokerServiceUrl: proto.String("pulsar://broker-1:6650"),
},
},
}, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
assert.NotNil(t, lr)

assert.Equal(t, "pulsar://broker-1:6650", lr.LogicalAddr.String())
assert.Equal(t, "pulsar://broker-1:6650", lr.PhysicalAddr.String())
}
35 changes: 21 additions & 14 deletions pulsar/internal/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type RPCClient interface {
}

type rpcClient struct {
serviceURL *url.URL
serviceNameResolver ServiceNameResolver
pool ConnectionPool
requestTimeout time.Duration
requestIDGenerator uint64
Expand All @@ -66,22 +66,26 @@ type rpcClient struct {
metrics *Metrics
}

func NewRPCClient(serviceURL *url.URL, pool ConnectionPool,
func NewRPCClient(serviceURL *url.URL, serviceNameResolver ServiceNameResolver, pool ConnectionPool,
requestTimeout time.Duration, logger log.Logger, metrics *Metrics) RPCClient {
return &rpcClient{
serviceURL: serviceURL,
pool: pool,
requestTimeout: requestTimeout,
log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
metrics: metrics,
serviceNameResolver: serviceNameResolver,
pool: pool,
requestTimeout: requestTimeout,
log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
metrics: metrics,
}
}

func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type,
message proto.Message) (*RPCResult, error) {

rpcResult, err := c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, message)
if _, ok := err.(net.Error); ok {
host, err := c.serviceNameResolver.ResolveHost()
if err != nil {
c.log.Errorf("request host resolve failed with error: {%v}", err)
return nil, err
}
rpcResult, err := c.Request(host, host, requestID, cmdType, message)
if _, ok := err.(net.Error); ok || (err != nil && err.Error() == "connection error") {
// We can retry this kind of requests over a connection error because they're
// not specific to a particular broker.
backoff := Backoff{100 * time.Millisecond}
Expand All @@ -92,17 +96,20 @@ func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_
retryTime = backoff.Next()
c.log.Debugf("Retrying request in {%v} with timeout in {%v}", retryTime, c.requestTimeout)
time.Sleep(retryTime)

rpcResult, err = c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, message)
if _, ok := err.(net.Error); ok {
host, err = c.serviceNameResolver.ResolveHost()
if err != nil {
c.log.Errorf("Retrying request host resolve failed with error: {%v}", err)
continue
}
rpcResult, err = c.Request(host, host, requestID, cmdType, message)
if _, ok := err.(net.Error); ok || (err != nil && err.Error() == "connection error") {
continue
} else {
// We either succeeded or encountered a non connection error
break
}
}
}

return rpcResult, err
}

Expand Down
Loading

0 comments on commit 5fb7f55

Please sign in to comment.