Skip to content

Commit

Permalink
Integrate CBGT for DCP sharding - issue #922
Browse files Browse the repository at this point in the history
  • Loading branch information
Traun Leyden committed Aug 14, 2015
1 parent 7b84950 commit 59b33d3
Show file tree
Hide file tree
Showing 20 changed files with 789 additions and 114 deletions.
15 changes: 15 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,18 @@
[submodule "src/github.com/couchbase/sg-bucket"]
path = src/github.com/couchbase/sg-bucket
url = https://github.com/couchbase/sg-bucket.git
[submodule "src/github.com/couchbaselabs/cbgt"]
path = src/github.com/couchbaselabs/cbgt
url = https://github.com/couchbaselabs/cbgt.git
[submodule "src/github.com/couchbase/clog"]
path = src/github.com/couchbase/clog
url = https://github.com/couchbase/clog.git
[submodule "src/github.com/couchbaselabs/blance"]
path = src/github.com/couchbaselabs/blance
url = https://github.com/couchbaselabs/blance.git
[submodule "src/github.com/rcrowley/go-metrics"]
path = src/github.com/rcrowley/go-metrics
url = https://github.com/rcrowley/go-metrics.git
[submodule "src/github.com/couchbase/cbauth"]
path = src/github.com/couchbase/cbauth
url = https://github.com/couchbase/cbauth.git
1 change: 1 addition & 0 deletions src/github.com/couchbase/cbauth
Submodule cbauth added at 92c0ce
1 change: 1 addition & 0 deletions src/github.com/couchbase/clog
Submodule clog added at e0f356
230 changes: 223 additions & 7 deletions src/github.com/couchbase/sync_gateway/base/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
package base

import (
"encoding/json"
"errors"
"fmt"
"log"
"regexp"
"strconv"
"strings"
Expand All @@ -21,11 +23,17 @@ import (
"github.com/couchbase/gomemcached"
"github.com/couchbase/gomemcached/client"
"github.com/couchbase/sg-bucket"

"github.com/couchbaselabs/cbgt"
"github.com/couchbaselabs/cbgt/cmd"
"github.com/couchbaselabs/walrus"
)

const TapFeedType = "tap"
const DcpFeedType = "dcp"
const (
TapFeedType = "tap"
DcpFeedType = "dcp"
DcpShardFeedType = "dcpshard"
)

func init() {
// Increase max memcached request size to 20M bytes, to support large docs (attachments!)
Expand All @@ -42,20 +50,37 @@ type AuthHandler couchbase.AuthHandler
type BucketSpec struct {
Server, PoolName, BucketName, FeedType string
Auth AuthHandler
FeedParams FeedParams
}

// These are used by CBGT to determine the sharding factor and other properties
type FeedParams struct {
MaxPartitionsPerPIndex int `json:"max_partitions_per_pindex"`
NumReplicas int `json:"num_replicas"`
DataDir string `json:"data_dir"`
}

func (f FeedParams) PlanParams() cbgt.PlanParams {
return cbgt.PlanParams{
MaxPartitionsPerPIndex: f.MaxPartitionsPerPIndex,
NumReplicas: f.NumReplicas,
}
}

// Implementation of sgbucket.Bucket that talks to a Couchbase server
type CouchbaseBucket struct {
*couchbase.Bucket
spec BucketSpec // keep a copy of the BucketSpec for DCP usage
*couchbase.Bucket // the underlying go-couchbase bucket
spec BucketSpec // keep a copy of the BucketSpec for DCP usage
}

type couchbaseFeedImpl struct {
*couchbase.TapFeed
events <-chan sgbucket.TapEvent
}

var versionString string
var (
versionString string
)

func (feed *couchbaseFeedImpl) Events() <-chan sgbucket.TapEvent {
return feed.events
Expand Down Expand Up @@ -112,19 +137,210 @@ func (bucket CouchbaseBucket) View(ddoc, name string, params map[string]interfac
}

func (bucket CouchbaseBucket) StartTapFeed(args sgbucket.TapArguments) (sgbucket.TapFeed, error) {

// Uses tap by default, unless DCP is explicitly specified
if bucket.spec.FeedType == DcpFeedType {
switch bucket.spec.FeedType {
case DcpFeedType:
feed, err := bucket.StartDCPFeed(args)
if err != nil {
Warn("Unable to start DCP feed - reverting to using TAP feed: %s", err)
return bucket.StartCouchbaseTapFeed(args)
}
LogTo("Feed", "Using DCP feed for bucket: %q", bucket.GetName())
return feed, nil
} else {

case DcpShardFeedType:

// TODO -- is this guaranteed to _only_ be called once per bucket
// throughout lifetime of Sync Gateway? If not, this will need to get reworked.

// CBGT initialization

// Create the TapEvent feed channel that will be passed back to the caller
eventFeed := make(chan sgbucket.TapEvent, 10)

// wrap functions with closures that have references to channel
newSyncGatewayPIndexImpl := NewSyncGatewayPIndexImplFactory(eventFeed, bucket, args)
openSyncGatewayPIndexImpl := OpenSyncGatewayPIndexImplFactory(eventFeed, bucket, args)

// register with CBGT
cbgt.RegisterPIndexImplType(IndexTypeSyncGateway, &cbgt.PIndexImplType{
New: newSyncGatewayPIndexImpl,
Open: openSyncGatewayPIndexImpl,
Count: nil,
Query: nil,
Description: "Sync Gateway Pindex",
})

if err := bucket.initCBGTManager(); err != nil {
log.Fatalf("Unable to initialize CBGT for sharded dcp: %v", err)
}

// create the index
alreadyExists, err := bucket.checkCBGTIndexExists(bucket.Name)
if err != nil {
log.Fatalf("Error checking if CBGT index exists: %v", err)
}
if !alreadyExists {
if err := bucket.createCBGTIndex(); err != nil {
log.Fatalf("Unable to initialize CBGT index: %v", err)
}
}

// - create a new CBGTDCPFeed and pass in the eventFeed channel
feed := &CBGTDCPFeed{
eventFeed: eventFeed,
}
return feed, nil

default:
LogTo("Feed", "Using TAP feed for bucket: %q (based on feed_type specified in config file", bucket.GetName())
return bucket.StartCouchbaseTapFeed(args)

}

}

func (bucket CouchbaseBucket) initCBGTManager() error {

username, password, _ := bucket.getDcpAuthHandler().GetCredentials()

couchbaseUrl, err := CouchbaseUrlWithAuth(
bucket.spec.Server,
username,
password,
bucket.Name,
)
if err != nil {
return err
}

// the connection string, eg: "couchbase:http://user:pass@localhost:8091"
connect := fmt.Sprintf("couchbase:%v", couchbaseUrl)

uuid, err := cmd.MainUUID(IndexTypeSyncGateway, bucket.spec.FeedParams.DataDir)
if err != nil {
return err
}

// this tells CBGT that we are brining a new CBGT node online
register := "wanted"

// use the uuid as the bindHttp so that we don't have to make the user
// configure this, and since as far as the REST Api interface, we'll be using
// whatever is configured in adminInterface anyway.
// More info here:
// https://github.com/couchbaselabs/cbgt/issues/1
// https://github.com/couchbaselabs/cbgt/issues/25
bindHttp := uuid

cfg, err := cmd.MainCfg(
IndexTypeSyncGateway,
connect,
bindHttp,
register,
bucket.spec.FeedParams.DataDir,
)
if err != nil {
return err
}

// type assertion to convert to CfgCB so we can call SetKeyPrefix
cfgCb, ok := cfg.(*cbgt.CfgCB)
if !ok {
return fmt.Errorf("Type assertion failure")
}

// this will cause CBGT to name the cfg document _sync:cfg
cfgCb.SetKeyPrefix("_sync:")

tags := []string{"feed", "janitor", "pindex", "planner"}
container := ""
weight := 1 // this would allow us to have more pindexes serviced by this node
server := bucket.spec.Server // or use "." (don't bother checking)
extras := ""
var managerEventHandlers cbgt.ManagerEventHandlers

// refresh it so we have a fresh copy
if err := cfg.Refresh(); err != nil {
return err
}

CBGTManager = cbgt.NewManager(
cbgt.VERSION,
cfgCb,
uuid,
tags,
container,
weight,
extras,
bindHttp,
bucket.spec.FeedParams.DataDir,
server,
managerEventHandlers,
)
err = CBGTManager.Start(register)
if err != nil {
log.Printf("Manager.Start() returned error: %v", err)
return err
}

return nil

}

func (bucket CouchbaseBucket) checkCBGTIndexExists(indexName string) (bool, error) {

_, indexDefsMap, err := CBGTManager.GetIndexDefs(true)
if err != nil {
return false, err
}

return (indexDefsMap[indexName] != nil), nil

}

// Create an "index" in CBGT which will cause it to start streaming
// DCP events to us for our shard of the full DCP stream.
func (bucket CouchbaseBucket) createCBGTIndex() error {

authHandler := bucket.getDcpAuthHandler()
user, pwd, _ := authHandler.GetCredentials()

sourceParams := cbgt.NewDCPFeedParams()
sourceParams.AuthUser = user
sourceParams.AuthPassword = pwd

sourceParamsBytes, err := json.Marshal(sourceParams)
if err != nil {
return err
}

err = CBGTManager.CreateIndex(
SourceTypeCouchbase, // sourceType
bucket.Name, // sourceName
bucket.UUID, // sourceUUID
string(sourceParamsBytes), // sourceParams
IndexTypeSyncGateway, // indexType
bucket.Name, // indexName
"", // indexParams
bucket.spec.FeedParams.PlanParams(), // planParams
"", // prevIndexUUID
)

if err != nil {
LogTo("DCP", "Error creating CBGT index: %v", err)
}

// if it's an "index exists" error, then ignore it.
// otherwise, propagate it.
if err != nil && strings.Contains(err.Error(), "exists") {
LogTo("DCP", "Unable to create CBGT index, already exists: %v", err)
return nil
}

return err

}

func (bucket CouchbaseBucket) StartCouchbaseTapFeed(args sgbucket.TapArguments) (sgbucket.TapFeed, error) {
Expand Down
23 changes: 19 additions & 4 deletions src/github.com/couchbase/sync_gateway/base/dcp_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,26 @@ type dcpAuth struct {
// Reviewed with Steve and he said that the auth handling in couchbase server is undergoing changes
// as part of Sherlock, so we may need to revisit once they've stabilized on an approach.
func (a dcpAuth) GetCredentials() (string, string, string) {
if a.Username == "" {
return a.BucketName, "", a.BucketName
} else {
return a.Username, a.Password, a.BucketName

var (
username string
password string
)

// as long as it's not the default bucket, if the username is empty
// then set the username to the bucketname. (if it's the default bucket, the
// username should just be empty rather than "default")
if a.Username == "" && a.BucketName != "default" {
username = a.BucketName
}

// if the username is empty, then the password should be empty too
if username == "" {
password = ""
}

return username, password, a.BucketName

}

type Receiver interface {
Expand Down
Loading

0 comments on commit 59b33d3

Please sign in to comment.