Skip to content

Commit

Permalink
Support listener name for go client (#502)
Browse files Browse the repository at this point in the history
* Support listener name for go client

Signed-off-by: xiaolongran <xiaolongran@tencent.com>

* fix test error

Signed-off-by: xiaolongran <xiaolongran@tencent.com>

* fix test error

Signed-off-by: xiaolongran <xiaolongran@tencent.com>

Co-authored-by: xiaolongran <xiaolongran@tencent.com>
  • Loading branch information
wolfstudy and wolfstudy authored Apr 6, 2021
1 parent 6ab17dc commit f17deac
Show file tree
Hide file tree
Showing 6 changed files with 708 additions and 676 deletions.
3 changes: 3 additions & 0 deletions pulsar/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ type ClientOptions struct {
// Configure whether the Pulsar client verify the validity of the host name from broker (default: false)
TLSValidateHostname bool

// Configure the net model for vpc user to connect the pulsar broker
ListenerName string

// Max number of connections to a single broker that will kept in the pool. (Default: 1 connection)
MaxConnectionsPerBroker int

Expand Down
3 changes: 2 additions & 1 deletion pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ func newClient(options ClientOptions) (Client, error) {
serviceNameResolver := internal.NewPulsarServiceNameResolver(url)

c.rpcClient = internal.NewRPCClient(url, serviceNameResolver, c.cnxPool, operationTimeout, logger, metrics)
c.lookupService = internal.NewLookupService(c.rpcClient, url, serviceNameResolver, tlsConfig != nil, logger, metrics)
c.lookupService = internal.NewLookupService(c.rpcClient, url, serviceNameResolver,
tlsConfig != nil, options.ListenerName, logger, metrics)
c.handlers = internal.NewClientHandlers()

return c, nil
Expand Down
18 changes: 11 additions & 7 deletions pulsar/internal/lookup_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,21 @@ type lookupService struct {
rpcClient RPCClient
serviceNameResolver ServiceNameResolver
tlsEnabled bool
listenerName string
log log.Logger
metrics *Metrics
}

// NewLookupService init a lookup service struct and return an object of LookupService.
func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, serviceNameResolver ServiceNameResolver,
tlsEnabled bool, logger log.Logger, metrics *Metrics) LookupService {
tlsEnabled bool, listenerName string, logger log.Logger, metrics *Metrics) LookupService {
return &lookupService{
rpcClient: rpcClient,
serviceNameResolver: serviceNameResolver,
tlsEnabled: tlsEnabled,
log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
metrics: metrics,
listenerName: listenerName,
}
}

Expand Down Expand Up @@ -96,9 +98,10 @@ func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
ls.metrics.LookupRequestsCount.Inc()
id := ls.rpcClient.NewRequestID()
res, err := ls.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_LOOKUP, &pb.CommandLookupTopic{
RequestId: &id,
Topic: &topic,
Authoritative: proto.Bool(false),
RequestId: &id,
Topic: &topic,
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(ls.listenerName),
})
if err != nil {
return nil, err
Expand All @@ -120,9 +123,10 @@ func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {

id := ls.rpcClient.NewRequestID()
res, err = ls.rpcClient.Request(logicalAddress, physicalAddr, id, pb.BaseCommand_LOOKUP, &pb.CommandLookupTopic{
RequestId: &id,
Topic: &topic,
Authoritative: lr.Authoritative,
RequestId: &id,
Topic: &topic,
Authoritative: lr.Authoritative,
AdvertisedListenerName: proto.String(ls.listenerName),
})
if err != nil {
return nil, err
Expand Down
97 changes: 54 additions & 43 deletions pulsar/internal/lookup_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,10 @@ func TestLookupSuccess(t *testing.T) {

expectedRequests: []pb.CommandLookupTopic{
{
RequestId: proto.Uint64(1),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
RequestId: proto.Uint64(1),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -132,7 +133,7 @@ func TestLookupSuccess(t *testing.T) {
BrokerServiceUrl: proto.String("pulsar://broker-1:6650"),
},
},
}, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, serviceNameResolver, false, "", log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -152,9 +153,10 @@ func TestTlsLookupSuccess(t *testing.T) {

expectedRequests: []pb.CommandLookupTopic{
{
RequestId: proto.Uint64(1),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
RequestId: proto.Uint64(1),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -165,7 +167,7 @@ func TestTlsLookupSuccess(t *testing.T) {
BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-1:6651"),
},
},
}, url, serviceNameResolver, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, serviceNameResolver, true, "", log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -185,9 +187,10 @@ func TestLookupWithProxy(t *testing.T) {

expectedRequests: []pb.CommandLookupTopic{
{
RequestId: proto.Uint64(1),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
RequestId: proto.Uint64(1),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -199,7 +202,7 @@ func TestLookupWithProxy(t *testing.T) {
ProxyThroughServiceUrl: proto.Bool(true),
},
},
}, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, serviceNameResolver, false, "", log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -218,9 +221,10 @@ func TestTlsLookupWithProxy(t *testing.T) {

expectedRequests: []pb.CommandLookupTopic{
{
RequestId: proto.Uint64(1),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
RequestId: proto.Uint64(1),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -232,7 +236,7 @@ func TestTlsLookupWithProxy(t *testing.T) {
ProxyThroughServiceUrl: proto.Bool(true),
},
},
}, url, NewPulsarServiceNameResolver(url), true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, NewPulsarServiceNameResolver(url), true, "", log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -252,14 +256,16 @@ func TestLookupWithRedirect(t *testing.T) {

expectedRequests: []pb.CommandLookupTopic{
{
RequestId: proto.Uint64(1),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
RequestId: proto.Uint64(1),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
},
{
RequestId: proto.Uint64(2),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(true),
RequestId: proto.Uint64(2),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(true),
AdvertisedListenerName: proto.String(""),
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -276,7 +282,7 @@ func TestLookupWithRedirect(t *testing.T) {
BrokerServiceUrl: proto.String("pulsar://broker-1:6650"),
},
},
}, url, NewPulsarServiceNameResolver(url), false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, NewPulsarServiceNameResolver(url), false, "", log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -296,14 +302,16 @@ func TestTlsLookupWithRedirect(t *testing.T) {

expectedRequests: []pb.CommandLookupTopic{
{
RequestId: proto.Uint64(1),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
RequestId: proto.Uint64(1),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
},
{
RequestId: proto.Uint64(2),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(true),
RequestId: proto.Uint64(2),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(true),
AdvertisedListenerName: proto.String(""),
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -320,7 +328,7 @@ func TestTlsLookupWithRedirect(t *testing.T) {
BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-1:6651"),
},
},
}, url, NewPulsarServiceNameResolver(url), true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, NewPulsarServiceNameResolver(url), true, "", log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -339,9 +347,10 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {

expectedRequests: []pb.CommandLookupTopic{
{
RequestId: proto.Uint64(1),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
RequestId: proto.Uint64(1),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -353,7 +362,7 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {
ProxyThroughServiceUrl: proto.Bool(false),
},
},
}, url, NewPulsarServiceNameResolver(url), false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, NewPulsarServiceNameResolver(url), false, "", log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.Error(t, err)
Expand All @@ -369,9 +378,10 @@ func TestLookupWithLookupFailure(t *testing.T) {

expectedRequests: []pb.CommandLookupTopic{
{
RequestId: proto.Uint64(1),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
RequestId: proto.Uint64(1),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -381,7 +391,7 @@ func TestLookupWithLookupFailure(t *testing.T) {
Authoritative: proto.Bool(true),
},
},
}, url, NewPulsarServiceNameResolver(url), false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, NewPulsarServiceNameResolver(url), false, "", log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.Error(t, err)
Expand Down Expand Up @@ -468,7 +478,7 @@ func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {
Response: pb.CommandPartitionedTopicMetadataResponse_Success.Enum(),
},
},
}, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, serviceNameResolver, false, "", log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

metadata, err := ls.GetPartitionedTopicMetadata("my-topic")
assert.NoError(t, err)
Expand All @@ -486,9 +496,10 @@ func TestLookupSuccessWithMultipleHosts(t *testing.T) {

expectedRequests: []pb.CommandLookupTopic{
{
RequestId: proto.Uint64(1),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
RequestId: proto.Uint64(1),
Topic: proto.String("my-topic"),
AdvertisedListenerName: proto.String(""),
Authoritative: proto.Bool(false),
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -499,7 +510,7 @@ func TestLookupSuccessWithMultipleHosts(t *testing.T) {
BrokerServiceUrl: proto.String("pulsar://broker-1:6650"),
},
},
}, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, serviceNameResolver, false, "", log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand Down
Loading

0 comments on commit f17deac

Please sign in to comment.