Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Passing the memcached auth to cbdatasource #55

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions cbdatasource/cbdatasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ type BucketDataSourceStats struct {
TotWorkerAuth uint64
TotWorkerAuthErr uint64
TotWorkerAuthFail uint64
TotWorkerSelBktFail uint64
TotWorkerSelBktOk uint64
TotWorkerAuthOk uint64
TotWorkerUPROpenErr uint64
TotWorkerUPROpenOk uint64
Expand Down Expand Up @@ -352,7 +354,8 @@ type bucketDataSource struct {
bucketName string
bucketUUID string
vbucketIDs []uint16
auth couchbase.AuthHandler
auth couchbase.AuthHandler // auth for couchbase
authMemchd couchbase.AuthHandler // auth for memcached
receiver Receiver
options *BucketDataSourceOptions

Expand Down Expand Up @@ -392,6 +395,7 @@ func NewBucketDataSource(
bucketUUID string,
vbucketIDs []uint16,
auth couchbase.AuthHandler,
authMemchd couchbase.AuthHandler,
receiver Receiver,
options *BucketDataSourceOptions) (BucketDataSource, error) {
if len(serverURLs) < 1 {
Expand All @@ -416,6 +420,7 @@ func NewBucketDataSource(
bucketUUID: bucketUUID,
vbucketIDs: vbucketIDs,
auth: auth,
authMemchd: authMemchd,
receiver: receiver,
options: options,

Expand Down Expand Up @@ -720,8 +725,8 @@ func (d *bucketDataSource) worker(server string, workerCh chan []uint16) int {
defer client.Close()
atomic.AddUint64(&d.stats.TotWorkerConnectOk, 1)

if d.auth != nil {
user, pswd, _ := d.auth.GetCredentials()
if d.authMemchd != nil {
user, pswd, _ := d.authMemchd.GetCredentials()
if user != "" {
atomic.AddUint64(&d.stats.TotWorkerAuth, 1)
res, err := client.Auth(user, pswd)
Expand All @@ -738,6 +743,13 @@ func (d *bucketDataSource) worker(server string, workerCh chan []uint16) int {
}
atomic.AddUint64(&d.stats.TotWorkerAuthOk, 1)
}
_, err = client.SelectBucket(d.bucketName)
if err != nil {
atomic.AddUint64(&d.stats.TotWorkerSelBktFail, 1)
d.receiver.OnError(fmt.Errorf("worker select bucket err: %v", err))
return 0
}
atomic.AddUint64(&d.stats.TotWorkerSelBktOk, 1)
}

uprOpenName := d.options.Name
Expand Down