Skip to content

Commit

Permalink
[INLONG-10427][SDK] The Go SDK supports authentication for Manager ac…
Browse files Browse the repository at this point in the history
…cess (apache#10427)
  • Loading branch information
neolinli committed Jun 15, 2024
1 parent a97585e commit 868779d
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (c *client) initAll() error {
}

func (c *client) initDiscoverer() error {
dis, err := NewDiscoverer(c.options.URL, c.options.GroupID, c.options.UpdateInterval, c.options.Logger)
dis, err := NewDiscoverer(c.options.URL, c.options.GroupID, c.options.UpdateInterval, c.options.Logger, c.options.Auth)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package dataproxy

import (
"context"
"crypto/tls"
"encoding/json"
"errors"
Expand All @@ -32,8 +33,13 @@ import (
"github.com/go-resty/resty/v2"
)

// Auth dataproxy authentication interface
type Auth interface {
GetToken(ctx context.Context, groupID string) (key string, token string, err error)
}

// NewDiscoverer news a DataProxy discoverer
func NewDiscoverer(url, groupID string, lookupInterval time.Duration, log logger.Logger) (discoverer.Discoverer, error) {
func NewDiscoverer(url, groupID string, lookupInterval time.Duration, log logger.Logger, auth Auth) (discoverer.Discoverer, error) {
if url == "" {
return nil, errors.New("URL is not given")
}
Expand All @@ -55,6 +61,7 @@ func NewDiscoverer(url, groupID string, lookupInterval time.Duration, log logger
endpointListMap: make(map[string]discoverer.Endpoint),
eventHandlers: make(map[discoverer.EventHandler]struct{}),
log: log,
auth: auth,
}

// initial lookup
Expand All @@ -74,6 +81,7 @@ type dataProxyDiscoverer struct {
eventHandlers map[discoverer.EventHandler]struct{}
closeFunc func()
log logger.Logger
auth Auth
}

func (d *dataProxyDiscoverer) GetEndpoints() discoverer.EndpointList {
Expand Down Expand Up @@ -189,26 +197,46 @@ func (d *dataProxyDiscoverer) update() {
func (d *dataProxyDiscoverer) get(retry int) (*cluster, error) {
reqURL := fmt.Sprintf("%s/%s?protocolType=tcp", d.url, d.groupID)
client := resty.New().SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true})
req := client.R()
if d.auth != nil {
key, token, err := d.auth.GetToken(context.Background(), d.groupID)
if err != nil {
return nil, fmt.Errorf("failed to get auth token. %w", err)
}
req = req.SetHeader(key, token)
}

httpRsp, err := client.R().Post(reqURL)
httpRsp, err := req.Post(reqURL)
if err != nil {
d.log.Error("get server endpoint list failed:", err)
if retry <= 1 {
return nil, err
return nil, fmt.Errorf("failed to required dataProxy service endpoint list. %w", err)
}

retry--
return d.get(retry)
}

if !httpRsp.IsSuccess() {
err := fmt.Errorf("dataProxy response error. http code:%d body:%s", httpRsp.StatusCode(), httpRsp.Body())
d.log.Error(err)
return nil, err
}

traceID := httpRsp.Header().Get("trace-id")

rsp := &response{}
err = json.Unmarshal(httpRsp.Body(), rsp)
if err != nil {
err = fmt.Errorf("failed to unmarshal dataProxy service endpoint data. trace-id:%s %w", traceID, err)
d.log.Error(err)
return nil, err
}

if !rsp.Success {
return nil, errors.New(rsp.ErrMsg)
err = fmt.Errorf("dataProxy response error. trace-id:%s %s", traceID, rsp.ErrMsg)
d.log.Error(err)
return nil, err
}

return &rsp.Data, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type Options struct {
BlockIfQueueIsFull bool // whether Send and SendAsync block if producer's message queue is full, default: false
AddColumns map[string]string // addition columns to add to the message, for example: __addcol1__worldid=xxx&__addcol2__ip=yyy, all the message will be added 2 more columns with worldid=xxx and ip=yyy
addColumnStr string // the string format of the AddColumns, just a cache, used internal
Auth Auth // dataproxy authentication interface
}

// ValidateAndSetDefault validates an options and set up the default values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,13 @@ func WithMetricsRegistry(reg prometheus.Registerer) Option {
o.MetricsRegistry = reg
}
}

// WithAtuh sets Atuh
func WithAtuh(auth Auth) Option {
return func(o *Options) {
if auth == nil {
return
}
o.Auth = auth
}
}

0 comments on commit 868779d

Please sign in to comment.