diff --git a/go.sum b/go.sum index 7f61837d9f..a14857b2d2 100644 --- a/go.sum +++ b/go.sum @@ -27,6 +27,7 @@ github.com/danieljoos/wincred v1.0.2/go.mod h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3E github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 h1:CaO/zOnF8VvUfEbhRatPcwKVWamvbYd8tQGRWacE9kU= github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1/go.mod h1:+hnT3ywWDTAFrW5aE+u2Sa/wT555ZqwoCS+pk3p6ry4= github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA= github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0= diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go index 5882dd4ff4..ee8ff50a24 100644 --- a/pulsar/client_impl.go +++ b/pulsar/client_impl.go @@ -123,8 +123,10 @@ func newClient(options ClientOptions) (Client, error) { log: logger, metrics: metrics, } - c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout, logger, metrics) - c.lookupService = internal.NewLookupService(c.rpcClient, url, tlsConfig != nil, logger, metrics) + serviceNameResolver := internal.NewPulsarServiceNameResolver(url) + + c.rpcClient = internal.NewRPCClient(url, serviceNameResolver, c.cnxPool, operationTimeout, logger, metrics) + c.lookupService = internal.NewLookupService(c.rpcClient, url, serviceNameResolver, tlsConfig != nil, logger, metrics) c.handlers = internal.NewClientHandlers() return c, nil diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go index 661eac2765..65732ea81d 100644 --- a/pulsar/client_impl_test.go +++ b/pulsar/client_impl_test.go @@ -18,6 +18,7 @@ package pulsar import ( + "context" "crypto/tls" "fmt" "io/ioutil" @@ -458,3 +459,58 @@ func anonymousNamespacePolicy() map[string]interface{} { "replication_clusters": []string{"standalone"}, } } + +func TestRetryWithMultipleHosts(t *testing.T) { + // Multi hosts included an unreached port and the actual port for verify retry logic + client, err := NewClient(ClientOptions{ + URL: "pulsar://localhost:6600,localhost:6650", + }) + + assert.Nil(t, err) + defer client.Close() + + topic := "persistent://public/default/retry-multiple-hosts-" + generateRandomName() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + }) + + assert.Nil(t, err) + defer producer.Close() + + ctx := context.Background() + var msgIDs [][]byte + + for i := 0; i < 10; i++ { + if msgID, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }); err != nil { + assert.Nil(t, err) + } else { + assert.NotNil(t, msgID) + msgIDs = append(msgIDs, msgID.Serialize()) + } + } + + assert.Equal(t, 10, len(msgIDs)) + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "retry-multi-hosts-sub", + Type: Shared, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + defer consumer.Close() + + for i := 0; i < 10; i++ { + msg, err := consumer.Receive(context.Background()) + assert.Nil(t, err) + assert.Contains(t, msgIDs, msg.ID().Serialize()) + consumer.Ack(msg) + } + + err = consumer.Unsubscribe() + assert.Nil(t, err) + +} diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go index 54d30b3815..29e126723d 100644 --- a/pulsar/internal/connection_pool.go +++ b/pulsar/internal/connection_pool.go @@ -103,6 +103,10 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U } if err := cnx.waitUntilReady(); err != nil { + if !wasCached { + p.pool.Delete(key) + p.log.Debug("Removed failed connection from pool:", cnx.logicalAddr, cnx.physicalAddr) + } return nil, err } return cnx, nil diff --git a/pulsar/internal/lookup_service.go b/pulsar/internal/lookup_service.go index 65fa383737..5b4e2ac1a1 100644 --- a/pulsar/internal/lookup_service.go +++ b/pulsar/internal/lookup_service.go @@ -45,22 +45,22 @@ type LookupService interface { } type lookupService struct { - rpcClient RPCClient - serviceURL *url.URL - tlsEnabled bool - log log.Logger - metrics *Metrics + rpcClient RPCClient + serviceNameResolver ServiceNameResolver + tlsEnabled bool + log log.Logger + metrics *Metrics } // NewLookupService init a lookup service struct and return an object of LookupService. -func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, +func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, serviceNameResolver ServiceNameResolver, tlsEnabled bool, logger log.Logger, metrics *Metrics) LookupService { return &lookupService{ - rpcClient: rpcClient, - serviceURL: serviceURL, - tlsEnabled: tlsEnabled, - log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}), - metrics: metrics, + rpcClient: rpcClient, + serviceNameResolver: serviceNameResolver, + tlsEnabled: tlsEnabled, + log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}), + metrics: metrics, } } @@ -78,7 +78,10 @@ func (ls *lookupService) getBrokerAddress(lr *pb.CommandLookupTopicResponse) (lo var physicalAddr *url.URL if lr.GetProxyThroughServiceUrl() { - physicalAddr = ls.serviceURL + physicalAddr, err = ls.serviceNameResolver.ResolveHost() + if err != nil { + return nil, nil, err + } } else { physicalAddr = logicalAddress } diff --git a/pulsar/internal/lookup_service_test.go b/pulsar/internal/lookup_service_test.go index ad0e1bd6d3..a3f9bfe150 100644 --- a/pulsar/internal/lookup_service_test.go +++ b/pulsar/internal/lookup_service_test.go @@ -112,6 +112,7 @@ func responseType(r pb.CommandLookupTopicResponse_LookupType) *pb.CommandLookupT func TestLookupSuccess(t *testing.T) { url, err := url.Parse("pulsar://example:6650") assert.NoError(t, err) + serviceNameResolver := NewPulsarServiceNameResolver(url) ls := NewLookupService(&mockedLookupRPCClient{ t: t, @@ -131,7 +132,7 @@ func TestLookupSuccess(t *testing.T) { BrokerServiceUrl: proto.String("pulsar://broker-1:6650"), }, }, - }, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) + }, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) lr, err := ls.Lookup("my-topic") assert.NoError(t, err) @@ -144,6 +145,7 @@ func TestLookupSuccess(t *testing.T) { func TestTlsLookupSuccess(t *testing.T) { url, err := url.Parse("pulsar+ssl://example:6651") assert.NoError(t, err) + serviceNameResolver := NewPulsarServiceNameResolver(url) ls := NewLookupService(&mockedLookupRPCClient{ t: t, @@ -163,7 +165,7 @@ func TestTlsLookupSuccess(t *testing.T) { BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-1:6651"), }, }, - }, url, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) + }, url, serviceNameResolver, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) lr, err := ls.Lookup("my-topic") assert.NoError(t, err) @@ -176,6 +178,7 @@ func TestTlsLookupSuccess(t *testing.T) { func TestLookupWithProxy(t *testing.T) { url, err := url.Parse("pulsar://example:6650") assert.NoError(t, err) + serviceNameResolver := NewPulsarServiceNameResolver(url) ls := NewLookupService(&mockedLookupRPCClient{ t: t, @@ -196,7 +199,7 @@ func TestLookupWithProxy(t *testing.T) { ProxyThroughServiceUrl: proto.Bool(true), }, }, - }, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) + }, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) lr, err := ls.Lookup("my-topic") assert.NoError(t, err) @@ -229,7 +232,7 @@ func TestTlsLookupWithProxy(t *testing.T) { ProxyThroughServiceUrl: proto.Bool(true), }, }, - }, url, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) + }, url, NewPulsarServiceNameResolver(url), true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) lr, err := ls.Lookup("my-topic") assert.NoError(t, err) @@ -273,7 +276,7 @@ func TestLookupWithRedirect(t *testing.T) { BrokerServiceUrl: proto.String("pulsar://broker-1:6650"), }, }, - }, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) + }, url, NewPulsarServiceNameResolver(url), false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) lr, err := ls.Lookup("my-topic") assert.NoError(t, err) @@ -317,7 +320,7 @@ func TestTlsLookupWithRedirect(t *testing.T) { BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-1:6651"), }, }, - }, url, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) + }, url, NewPulsarServiceNameResolver(url), true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) lr, err := ls.Lookup("my-topic") assert.NoError(t, err) @@ -350,7 +353,7 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) { ProxyThroughServiceUrl: proto.Bool(false), }, }, - }, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) + }, url, NewPulsarServiceNameResolver(url), false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) lr, err := ls.Lookup("my-topic") assert.Error(t, err) @@ -378,7 +381,7 @@ func TestLookupWithLookupFailure(t *testing.T) { Authoritative: proto.Bool(true), }, }, - }, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) + }, url, NewPulsarServiceNameResolver(url), false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) lr, err := ls.Lookup("my-topic") assert.Error(t, err) @@ -447,6 +450,7 @@ func (m mockedPartitionedTopicMetadataRPCClient) RequestOnCnx(cnx Connection, re func TestGetPartitionedTopicMetadataSuccess(t *testing.T) { url, err := url.Parse("pulsar://example:6650") assert.NoError(t, err) + serviceNameResolver := NewPulsarServiceNameResolver(url) ls := NewLookupService(&mockedPartitionedTopicMetadataRPCClient{ t: t, @@ -464,10 +468,43 @@ func TestGetPartitionedTopicMetadataSuccess(t *testing.T) { Response: pb.CommandPartitionedTopicMetadataResponse_Success.Enum(), }, }, - }, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) + }, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) metadata, err := ls.GetPartitionedTopicMetadata("my-topic") assert.NoError(t, err) assert.NotNil(t, metadata) assert.Equal(t, metadata.GetPartitions(), uint32(1)) } + +func TestLookupSuccessWithMultipleHosts(t *testing.T) { + url, err := url.Parse("pulsar://host1,host2,host3:6650") + assert.NoError(t, err) + serviceNameResolver := NewPulsarServiceNameResolver(url) + + ls := NewLookupService(&mockedLookupRPCClient{ + t: t, + + expectedRequests: []pb.CommandLookupTopic{ + { + RequestId: proto.Uint64(1), + Topic: proto.String("my-topic"), + Authoritative: proto.Bool(false), + }, + }, + mockedResponses: []pb.CommandLookupTopicResponse{ + { + RequestId: proto.Uint64(1), + Response: responseType(pb.CommandLookupTopicResponse_Connect), + Authoritative: proto.Bool(true), + BrokerServiceUrl: proto.String("pulsar://broker-1:6650"), + }, + }, + }, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) + + lr, err := ls.Lookup("my-topic") + assert.NoError(t, err) + assert.NotNil(t, lr) + + assert.Equal(t, "pulsar://broker-1:6650", lr.LogicalAddr.String()) + assert.Equal(t, "pulsar://broker-1:6650", lr.PhysicalAddr.String()) +} diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go index 0bb482c153..b51e4e3d91 100644 --- a/pulsar/internal/rpc_client.go +++ b/pulsar/internal/rpc_client.go @@ -56,7 +56,7 @@ type RPCClient interface { } type rpcClient struct { - serviceURL *url.URL + serviceNameResolver ServiceNameResolver pool ConnectionPool requestTimeout time.Duration requestIDGenerator uint64 @@ -66,22 +66,26 @@ type rpcClient struct { metrics *Metrics } -func NewRPCClient(serviceURL *url.URL, pool ConnectionPool, +func NewRPCClient(serviceURL *url.URL, serviceNameResolver ServiceNameResolver, pool ConnectionPool, requestTimeout time.Duration, logger log.Logger, metrics *Metrics) RPCClient { return &rpcClient{ - serviceURL: serviceURL, - pool: pool, - requestTimeout: requestTimeout, - log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}), - metrics: metrics, + serviceNameResolver: serviceNameResolver, + pool: pool, + requestTimeout: requestTimeout, + log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}), + metrics: metrics, } } func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) { - - rpcResult, err := c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, message) - if _, ok := err.(net.Error); ok { + host, err := c.serviceNameResolver.ResolveHost() + if err != nil { + c.log.Errorf("request host resolve failed with error: {%v}", err) + return nil, err + } + rpcResult, err := c.Request(host, host, requestID, cmdType, message) + if _, ok := err.(net.Error); ok || (err != nil && err.Error() == "connection error") { // We can retry this kind of requests over a connection error because they're // not specific to a particular broker. backoff := Backoff{100 * time.Millisecond} @@ -92,9 +96,13 @@ func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_ retryTime = backoff.Next() c.log.Debugf("Retrying request in {%v} with timeout in {%v}", retryTime, c.requestTimeout) time.Sleep(retryTime) - - rpcResult, err = c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, message) - if _, ok := err.(net.Error); ok { + host, err = c.serviceNameResolver.ResolveHost() + if err != nil { + c.log.Errorf("Retrying request host resolve failed with error: {%v}", err) + continue + } + rpcResult, err = c.Request(host, host, requestID, cmdType, message) + if _, ok := err.(net.Error); ok || (err != nil && err.Error() == "connection error") { continue } else { // We either succeeded or encountered a non connection error @@ -102,7 +110,6 @@ func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_ } } } - return rpcResult, err } diff --git a/pulsar/internal/service_name_resolver.go b/pulsar/internal/service_name_resolver.go new file mode 100644 index 0000000000..3b1209cc4e --- /dev/null +++ b/pulsar/internal/service_name_resolver.go @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package internal + +import ( + "errors" + "fmt" + "math/rand" + "net/url" + "sync/atomic" + "time" + + log "github.com/sirupsen/logrus" +) + +type ServiceNameResolver interface { + ResolveHost() (*url.URL, error) + ResolveHostURI() (*PulsarServiceURI, error) + UpdateServiceURL(url *url.URL) error + GetServiceURI() *PulsarServiceURI + GetServiceURL() *url.URL + GetAddressList() []*url.URL +} + +type pulsarServiceNameResolver struct { + ServiceURI *PulsarServiceURI + ServiceURL *url.URL + CurrentIndex int32 + AddressList []*url.URL +} + +func NewPulsarServiceNameResolver(url *url.URL) ServiceNameResolver { + r := &pulsarServiceNameResolver{} + err := r.UpdateServiceURL(url) + if err != nil { + log.Errorf("create pulsar service name resolver failed : %v", err) + } + return r +} + +func (r *pulsarServiceNameResolver) ResolveHost() (*url.URL, error) { + if r.AddressList == nil { + return nil, errors.New("no service url is provided yet") + } + if len(r.AddressList) == 0 { + return nil, fmt.Errorf("no hosts found for service url : %v", r.ServiceURL) + } + if len(r.AddressList) == 1 { + return r.AddressList[0], nil + } + idx := (r.CurrentIndex + 1) % int32(len(r.AddressList)) + atomic.StoreInt32(&r.CurrentIndex, idx) + return r.AddressList[idx], nil +} + +func (r *pulsarServiceNameResolver) ResolveHostURI() (*PulsarServiceURI, error) { + host, err := r.ResolveHost() + if err != nil { + return nil, err + } + hostURL := host.Scheme + "://" + host.Hostname() + ":" + host.Port() + return NewPulsarServiceURIFromURIString(hostURL) +} + +func (r *pulsarServiceNameResolver) UpdateServiceURL(u *url.URL) error { + uri, err := NewPulsarServiceURIFromURL(u) + if err != nil { + log.Errorf("invalid service-url instance %s provided %v", u, err) + return err + } + + hosts := uri.ServiceHosts + addresses := []*url.URL{} + for _, host := range hosts { + hostURL := uri.URL.Scheme + "://" + host + u, err := url.Parse(hostURL) + if err != nil { + log.Errorf("invalid host-url %s provided %v", hostURL, err) + return err + } + addresses = append(addresses, u) + } + r.AddressList = addresses + r.ServiceURL = u + r.ServiceURI = uri + rand.Seed(time.Now().Unix()) // initialize global pseudo random generator + atomic.StoreInt32(&r.CurrentIndex, int32(rand.Intn(len(addresses)))) + return nil +} + +func (r *pulsarServiceNameResolver) GetServiceURI() *PulsarServiceURI { + return r.ServiceURI +} + +func (r *pulsarServiceNameResolver) GetServiceURL() *url.URL { + return r.ServiceURL +} + +func (r *pulsarServiceNameResolver) GetAddressList() []*url.URL { + return r.AddressList +} diff --git a/pulsar/internal/service_name_resolver_test.go b/pulsar/internal/service_name_resolver_test.go new file mode 100644 index 0000000000..e1906339c5 --- /dev/null +++ b/pulsar/internal/service_name_resolver_test.go @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package internal + +import ( + "net/url" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestResolveBeforeUpdateServiceUrl(t *testing.T) { + resolver := NewPulsarServiceNameResolver(nil) + u, err := resolver.ResolveHost() + assert.Nil(t, u) + assert.NotNil(t, err) + assert.EqualError(t, err, "no service url is provided yet") +} + +func TestResolveUriBeforeUpdateServiceUrl(t *testing.T) { + resolver := NewPulsarServiceNameResolver(nil) + u, err := resolver.ResolveHostURI() + assert.Nil(t, u) + assert.NotNil(t, err) + assert.EqualError(t, err, "no service url is provided yet") +} + +func TestUpdateInvalidServiceUrl(t *testing.T) { + resolver := NewPulsarServiceNameResolver(nil) + url, _ := url.Parse("pulsar:///") + err := resolver.UpdateServiceURL(url) + assert.NotNil(t, err) + assert.Empty(t, resolver.GetServiceURL()) + assert.Nil(t, resolver.GetServiceURI()) +} + +func TestSimpleHostUrl(t *testing.T) { + resolver := NewPulsarServiceNameResolver(nil) + serviceURL, _ := url.Parse("pulsar://host1:6650") + err := resolver.UpdateServiceURL(serviceURL) + assert.Nil(t, err) + assert.Equal(t, serviceURL, resolver.GetServiceURL()) + expectedURI, err := NewPulsarServiceURIFromURL(serviceURL) + assert.Nil(t, err) + assert.Equal(t, expectedURI, resolver.GetServiceURI()) + actualHost, err := resolver.ResolveHost() + assert.Nil(t, err) + assert.Equal(t, "host1", actualHost.Hostname()) + assert.Equal(t, "6650", actualHost.Port()) + + newServiceURL, _ := url.Parse("pulsar://host2:6650") + err = resolver.UpdateServiceURL(newServiceURL) + assert.Nil(t, err) + assert.Equal(t, newServiceURL, resolver.GetServiceURL()) + expectedURI, err = NewPulsarServiceURIFromURL(newServiceURL) + assert.Nil(t, err) + assert.Equal(t, expectedURI, resolver.GetServiceURI()) + actualHost, err = resolver.ResolveHost() + assert.Nil(t, err) + assert.Equal(t, "host2", actualHost.Hostname()) + assert.Equal(t, "6650", actualHost.Port()) +} + +func TestMultipleHostsUrl(t *testing.T) { + resolver := NewPulsarServiceNameResolver(nil) + serviceURL, _ := url.Parse("pulsar://host1:6650,host2:6650") + err := resolver.UpdateServiceURL(serviceURL) + assert.Nil(t, err) + assert.Equal(t, serviceURL, resolver.GetServiceURL()) + expectedURI, err := NewPulsarServiceURIFromURL(serviceURL) + assert.Nil(t, err) + assert.Equal(t, expectedURI, resolver.GetServiceURI()) + host1, _ := url.Parse("pulsar://host1:6650") + host2, _ := url.Parse("pulsar://host2:6650") + host1uri, _ := NewPulsarServiceURIFromURIString("pulsar://host1:6650") + host2uri, _ := NewPulsarServiceURIFromURIString("pulsar://host2:6650") + assert.Contains(t, resolver.GetAddressList(), host1) + assert.Contains(t, resolver.GetAddressList(), host2) + hosts := []*url.URL{host1, host2} + hosturis := []*PulsarServiceURI{host1uri, host2uri} + for i := 0; i < 10; i++ { + host, err := resolver.ResolveHost() + assert.Nil(t, err) + hosturi, err := resolver.ResolveHostURI() + assert.Nil(t, err) + assert.Contains(t, hosts, host) + assert.Contains(t, hosturis, hosturi) + } +} + +func TestMultipleHostsTlsUrl(t *testing.T) { + resolver := NewPulsarServiceNameResolver(nil) + serviceURL, _ := url.Parse("pulsar+ssl://host1:6651,host2:6651") + err := resolver.UpdateServiceURL(serviceURL) + assert.Nil(t, err) + assert.Equal(t, serviceURL, resolver.GetServiceURL()) + expectedURI, err := NewPulsarServiceURIFromURL(serviceURL) + assert.Nil(t, err) + assert.Equal(t, expectedURI, resolver.GetServiceURI()) + host1, _ := url.Parse("pulsar+ssl://host1:6651") + host2, _ := url.Parse("pulsar+ssl://host2:6651") + host1uri, _ := NewPulsarServiceURIFromURIString("pulsar+ssl://host1:6651") + host2uri, _ := NewPulsarServiceURIFromURIString("pulsar+ssl://host2:6651") + assert.Contains(t, resolver.GetAddressList(), host1) + assert.Contains(t, resolver.GetAddressList(), host2) + hosts := []*url.URL{host1, host2} + hosturis := []*PulsarServiceURI{host1uri, host2uri} + for i := 0; i < 10; i++ { + host, err := resolver.ResolveHost() + assert.Nil(t, err) + hosturi, err := resolver.ResolveHostURI() + assert.Nil(t, err) + assert.Contains(t, hosts, host) + assert.Contains(t, hosturis, hosturi) + } +} diff --git a/pulsar/internal/service_uri.go b/pulsar/internal/service_uri.go new file mode 100644 index 0000000000..7b7533987a --- /dev/null +++ b/pulsar/internal/service_uri.go @@ -0,0 +1,211 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package internal + +import ( + "errors" + "fmt" + "net" + "net/url" + "strings" + + log "github.com/sirupsen/logrus" +) + +const ( + BinaryService = "pulsar" + HTTPService = "http" + HTTPSService = "https" + SSLService = "ssl" + BinaryPort = 6650 + BinaryTLSPort = 6651 + HTTPPort = 80 + HTTPSPort = 443 +) + +type PulsarServiceURI struct { + ServiceName string + ServiceInfos []string + ServiceHosts []string + servicePath string + URL *url.URL +} + +func NewPulsarServiceURIFromURIString(uri string) (*PulsarServiceURI, error) { + u, err := fromString(uri) + if err != nil { + log.Error(err) + return nil, err + } + return u, nil +} + +func NewPulsarServiceURIFromURL(url *url.URL) (*PulsarServiceURI, error) { + u, err := fromURL(url) + if err != nil { + log.Error(err) + return nil, err + } + return u, nil +} + +func fromString(uriStr string) (*PulsarServiceURI, error) { + if uriStr == "" || len(uriStr) == 0 { + return nil, errors.New("service uriStr string is null") + } + if strings.Contains(uriStr, "[") && strings.Contains(uriStr, "]") { + // deal with ipv6 address + hosts := strings.FieldsFunc(uriStr, splitURI) + if len(hosts) > 1 { + // deal with ipv6 address + firstHost := hosts[0] + lastHost := hosts[len(hosts)-1] + hasPath := strings.Contains(lastHost, "/") + path := "" + if hasPath { + idx := strings.Index(lastHost, "/") + path = lastHost[idx:] + } + firstHost += path + url, err := url.Parse(firstHost) + if err != nil { + return nil, err + } + serviceURI, err := fromURL(url) + if err != nil { + return nil, err + } + var mHosts []string + var multiHosts []string + mHosts = append(mHosts, serviceURI.ServiceHosts[0]) + mHosts = append(mHosts, hosts[1:]...) + + for _, v := range mHosts { + h, err := validateHostName(serviceURI.ServiceName, serviceURI.ServiceInfos, v) + if err == nil { + multiHosts = append(multiHosts, h) + } else { + return nil, err + } + } + + return &PulsarServiceURI{ + serviceURI.ServiceName, + serviceURI.ServiceInfos, + multiHosts, + serviceURI.servicePath, + serviceURI.URL, + }, nil + } + } + + url, err := url.Parse(uriStr) + if err != nil { + return nil, err + } + + return fromURL(url) +} + +func fromURL(url *url.URL) (*PulsarServiceURI, error) { + if url == nil { + return nil, errors.New("service url instance is null") + } + + if url.Host == "" || len(url.Host) == 0 { + return nil, errors.New("service host is null") + } + + var serviceName string + var serviceInfos []string + scheme := url.Scheme + if scheme != "" { + scheme = strings.ToLower(scheme) + schemeParts := strings.Split(scheme, "+") + serviceName = schemeParts[0] + serviceInfos = schemeParts[1:] + } + + var serviceHosts []string + hosts := strings.FieldsFunc(url.Host, splitURI) + for _, v := range hosts { + h, err := validateHostName(serviceName, serviceInfos, v) + if err == nil { + serviceHosts = append(serviceHosts, h) + } else { + return nil, err + } + } + + return &PulsarServiceURI{ + serviceName, + serviceInfos, + serviceHosts, + url.Path, + url, + }, nil +} + +func splitURI(r rune) bool { + return r == ',' || r == ';' +} + +func validateHostName(serviceName string, serviceInfos []string, hostname string) (string, error) { + uri, err := url.Parse("dummyscheme://" + hostname) + if err != nil { + return "", err + } + host := uri.Hostname() + if strings.Contains(hostname, "[") && strings.Contains(hostname, "]") { + host = fmt.Sprintf("[%s]", host) + } + if host == "" || uri.Scheme == "" { + return "", errors.New("Invalid hostname : " + hostname) + } + + port := uri.Port() + if uri.Port() == "" { + p := getServicePort(serviceName, serviceInfos) + if p == -1 { + return "", fmt.Errorf("invalid port : %d", p) + } + port = fmt.Sprint(p) + } + result := host + ":" + port + _, _, err = net.SplitHostPort(result) + if err != nil { + return "", err + } + return result, nil +} + +func getServicePort(serviceName string, serviceInfos []string) int { + switch strings.ToLower(serviceName) { + case BinaryService: + if len(serviceInfos) == 0 { + return BinaryPort + } else if len(serviceInfos) == 1 && strings.ToLower(serviceInfos[0]) == SSLService { + return BinaryTLSPort + } + case HTTPService: + return HTTPPort + case HTTPSService: + return HTTPSPort + } + return -1 +} diff --git a/pulsar/internal/service_uri_test.go b/pulsar/internal/service_uri_test.go new file mode 100644 index 0000000000..445b325406 --- /dev/null +++ b/pulsar/internal/service_uri_test.go @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package internal + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestInvalidServiceUris(t *testing.T) { + uris := []string{ + "://localhost:6650", // missing scheme + "pulsar:///", // missing authority + "pulsar://localhost:6650:6651/", // invalid hostname pair + "pulsar://localhost:xyz/", // invalid port + "pulsar://localhost:-6650/", // negative port + "pulsar://fec0:0:0:ffff::1:6650", // missing brackets + } + + for _, uri := range uris { + testInvalidServiceURI(t, uri) + } +} + +func TestEmptyServiceUriString(t *testing.T) { + u, err := NewPulsarServiceURIFromURIString("") + assert.Nil(t, u) + assert.NotNil(t, err) +} + +func TestNullServiceUrlInstance(t *testing.T) { + u, err := NewPulsarServiceURIFromURL(nil) + assert.Nil(t, u) + assert.NotNil(t, err) +} + +func TestMissingServiceName(t *testing.T) { + serviceURI := "//localhost:6650/path/to/namespace" + assertServiceURI(t, serviceURI, "", nil, []string{"localhost:6650"}, "/path/to/namespace", "") +} + +func TestEmptyPath(t *testing.T) { + serviceURI := "pulsar://localhost:6650" + assertServiceURI(t, serviceURI, "pulsar", nil, []string{"localhost:6650"}, "", "") +} + +func TestRootPath(t *testing.T) { + serviceURI := "pulsar://localhost:6650/" + assertServiceURI(t, serviceURI, "pulsar", nil, []string{"localhost:6650"}, "/", "") +} + +func TestUserInfo(t *testing.T) { + serviceURI := "pulsar://pulsaruser@localhost:6650/path/to/namespace" + assertServiceURI(t, serviceURI, "pulsar", nil, []string{"localhost:6650"}, "/path/to/namespace", "pulsaruser") +} + +func TestIpv6Uri(t *testing.T) { + serviceURI := "pulsar://pulsaruser@[fec0:0:0:ffff::1]:6650/path/to/namespace" + assertServiceURI(t, serviceURI, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650"}, "/path/to/namespace", + "pulsaruser") +} + +func TestIpv6UriWithoutPulsarPort(t *testing.T) { + serviceURI := "pulsar://[fec0:0:0:ffff::1]/path/to/namespace" + assertServiceURI(t, serviceURI, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650"}, "/path/to/namespace", "") +} + +func TestMultiIpv6Uri(t *testing.T) { + serviceURI := "pulsar://pulsaruser@[fec0:0:0:ffff::1]:6650,[fec0:0:0:ffff::2]:6650;[fec0:0:0:ffff::3]:6650" + + "/path/to/namespace" + assertServiceURI(t, serviceURI, "pulsar", nil, + []string{"[fec0:0:0:ffff::1]:6650", "[fec0:0:0:ffff::2]:6650", "[fec0:0:0:ffff::3]:6650"}, "/path/to/namespace", + "pulsaruser") +} + +func TestMultiIpv6UriWithoutPulsarPort(t *testing.T) { + serviceURI := "pulsar://pulsaruser@[fec0:0:0:ffff::1],[fec0:0:0:ffff::2];[fec0:0:0:ffff::3]/path/to/namespace" + assertServiceURI(t, serviceURI, "pulsar", nil, + []string{"[fec0:0:0:ffff::1]:6650", "[fec0:0:0:ffff::2]:6650", "[fec0:0:0:ffff::3]:6650"}, "/path/to/namespace", + "pulsaruser") +} + +func TestMultipleHostsSemiColon(t *testing.T) { + serviceURI := "pulsar://host1:6650;host2:6650;host3:6650/path/to/namespace" + assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, + "/path/to/namespace", "") +} + +func TestMultipleHostsComma(t *testing.T) { + serviceURI := "pulsar://host1:6650,host2:6650,host3:6650/path/to/namespace" + assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, + "/path/to/namespace", "") +} + +func TestMultipleHostsWithoutPulsarPorts(t *testing.T) { + serviceURI := "pulsar://host1,host2,host3/path/to/namespace" + assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, + "/path/to/namespace", "") +} + +func TestMultipleHostsWithoutPulsarTlsPorts(t *testing.T) { + serviceURI := "pulsar+ssl://host1,host2,host3/path/to/namespace" + assertServiceURI(t, serviceURI, "pulsar", []string{"ssl"}, []string{"host1:6651", "host2:6651", "host3:6651"}, + "/path/to/namespace", "") +} + +func TestMultipleHostsWithoutHttpPorts(t *testing.T) { + serviceURI := "http://host1,host2,host3/path/to/namespace" + assertServiceURI(t, serviceURI, "http", nil, []string{"host1:80", "host2:80", "host3:80"}, "/path/to/namespace", "") +} + +func TestMultipleHostsWithoutHttpsPorts(t *testing.T) { + serviceURI := "https://host1,host2,host3/path/to/namespace" + assertServiceURI(t, serviceURI, "https", nil, []string{"host1:443", "host2:443", "host3:443"}, "/path/to/namespace", + "") +} + +func TestMultipleHostsMixedPorts(t *testing.T) { + serviceURI := "pulsar://host1:6640,host2:6650,host3:6660/path/to/namespace" + assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6640", "host2:6650", "host3:6660"}, + "/path/to/namespace", "") +} + +func TestMultipleHostsMixed(t *testing.T) { + serviceURI := "pulsar://host1:6640,host2,host3:6660/path/to/namespace" + assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6640", "host2:6650", "host3:6660"}, + "/path/to/namespace", "") +} + +func TestUserInfoWithMultipleHosts(t *testing.T) { + serviceURI := "pulsar://pulsaruser@host1:6650;host2:6650;host3:6650/path/to/namespace" + assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, + "/path/to/namespace", "pulsaruser") +} + +func testInvalidServiceURI(t *testing.T, serviceURI string) { + u, err := NewPulsarServiceURIFromURIString(serviceURI) + t.Logf("testInvalidServiceURI %s", serviceURI) + assert.Nil(t, u) + assert.NotNil(t, err) +} + +func assertServiceURI(t *testing.T, serviceURI, expectedServiceName string, + expectedServiceInfo, expectedServiceHosts []string, expectedServicePath, expectedServiceUser string) { + uri, err := NewPulsarServiceURIFromURIString(serviceURI) + assert.Nil(t, err) + assert.NotNil(t, serviceURI) + assert.Equal(t, expectedServiceName, uri.ServiceName) + assert.Equal(t, expectedServicePath, uri.servicePath) + if expectedServiceUser != "" { + assert.Equal(t, expectedServiceUser, uri.URL.User.Username()) + } + assert.ElementsMatch(t, expectedServiceInfo, uri.ServiceInfos) + assert.ElementsMatch(t, expectedServiceHosts, uri.ServiceHosts) +} diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 5708cad581..4d62cac169 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -621,7 +621,7 @@ func TestProducerMetadata(t *testing.T) { } producer, err := client.CreateProducer(ProducerOptions{ Topic: topic, - Name: "my-producer", + Name: "meta-data-producer", Properties: props, }) if err != nil { diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index cd97964a07..f72ba1d1a0 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -586,3 +586,55 @@ func TestReaderLatestInclusiveHasNext(t *testing.T) { assert.False(t, reader.HasNext()) } + +func TestReaderWithMultiHosts(t *testing.T) { + // Multi hosts included an unreached port and the actual port for verify retry logic + client, err := NewClient(ClientOptions{ + URL: "pulsar://localhost:6600,localhost:6650", + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + ctx := context.Background() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + msgID, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.NoError(t, err) + assert.NotNil(t, msgID) + } + + // create reader on 5th message (not included) + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + + assert.Nil(t, err) + defer reader.Close() + + i := 0 + for reader.HasNext() { + msg, err := reader.Next(context.Background()) + assert.NoError(t, err) + + expectMsg := fmt.Sprintf("hello-%d", i) + assert.Equal(t, []byte(expectMsg), msg.Payload()) + + i++ + } + + assert.Equal(t, 10, i) +}