Skip to content

Commit

Permalink
CBG-3702: setup 7.6.1 buckets for integration tests (#6704)
Browse files Browse the repository at this point in the history
* CBG-3702: setup 7.6.1 buckets for integration tests
  • Loading branch information
gregns1 authored and bbrks committed Mar 28, 2024
1 parent ff6b44e commit f396187
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 74 deletions.
35 changes: 7 additions & 28 deletions base/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type CouchbaseBucketStore interface {
GetStatsVbSeqno(maxVbno uint16, useAbsHighSeqNo bool) (uuids map[uint16]uint64, highSeqnos map[uint16]uint64, seqErr error)

// MgmtRequest uses the CouchbaseBucketStore's http client to make an http request against a management endpoint.
MgmtRequest(ctx context.Context, method, uri, contentType string, body io.Reader) (*http.Response, error)
MgmtRequest(ctx context.Context, method, uri, contentType string, body io.Reader) ([]byte, int, error)
}

func AsCouchbaseBucketStore(b Bucket) (CouchbaseBucketStore, bool) {
Expand Down Expand Up @@ -398,14 +398,7 @@ func getMaxTTL(ctx context.Context, store CouchbaseBucketStore) (int, error) {
}

uri := fmt.Sprintf("/pools/default/buckets/%s", store.GetSpec().BucketName)
resp, err := store.MgmtRequest(ctx, http.MethodGet, uri, "application/json", nil)
if err != nil {
return -1, err
}

defer func() { _ = resp.Body.Close() }()

respBytes, err := io.ReadAll(resp.Body)
respBytes, _, err := store.MgmtRequest(ctx, http.MethodGet, uri, "application/json", nil)
if err != nil {
return -1, err
}
Expand All @@ -419,14 +412,7 @@ func getMaxTTL(ctx context.Context, store CouchbaseBucketStore) (int, error) {

// Get the Server UUID of the bucket, this is also known as the Cluster UUID
func GetServerUUID(ctx context.Context, store CouchbaseBucketStore) (uuid string, err error) {
resp, err := store.MgmtRequest(ctx, http.MethodGet, "/pools", "application/json", nil)
if err != nil {
return "", err
}

defer func() { _ = resp.Body.Close() }()

respBytes, err := io.ReadAll(resp.Body)
respBytes, _, err := store.MgmtRequest(ctx, http.MethodGet, "/pools", "application/json", nil)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -474,22 +460,15 @@ func retrievePurgeInterval(ctx context.Context, bucket CouchbaseBucketStore, uri
PurgeInterval float64 `json:"purgeInterval,omitempty"`
}

resp, err := bucket.MgmtRequest(ctx, http.MethodGet, uri, "application/json", nil)
respBytes, statusCode, err := bucket.MgmtRequest(ctx, http.MethodGet, uri, "application/json", nil)
if err != nil {
return 0, err
}

defer func() { _ = resp.Body.Close() }()

if resp.StatusCode == http.StatusForbidden {
if statusCode == http.StatusForbidden {
WarnfCtx(ctx, "403 Forbidden attempting to access %s. Bucket user must have Bucket Full Access and Bucket Admin roles to retrieve metadata purge interval.", UD(uri))
} else if resp.StatusCode != http.StatusOK {
return 0, errors.New(resp.Status)
}

respBytes, err := io.ReadAll(resp.Body)
if err != nil {
return 0, err
} else if statusCode != http.StatusOK {
return 0, errors.New(fmt.Sprintf("failed with status code, %d, statusCode", statusCode))
}

if err := JSONUnmarshal(respBytes, &purgeResponse); err != nil {
Expand Down
68 changes: 54 additions & 14 deletions base/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ func GetGocbV2BucketFromCluster(ctx context.Context, cluster *gocb.Cluster, spec
}
gocbv2Bucket.kvOps = make(chan struct{}, MaxConcurrentSingleOps*nodeCount*(*numPools))

// Query to see if mobile XDCR bucket setting is set and store on bucket object
err = gocbv2Bucket.queryHLVBucketSetting(ctx)
if err != nil {
return nil, err
}

return gocbv2Bucket, nil
}

Expand All @@ -185,6 +191,7 @@ type GocbV2Bucket struct {
queryOps chan struct{} // Manages max concurrent query ops
kvOps chan struct{} // Manages max concurrent kv ops
clusterCompatMajorVersion, clusterCompatMinorVersion uint64 // E.g: 6 and 0 for 6.0.3
supportsHLV bool // Flag to indicate with bucket supports mobile XDCR
}

var (
Expand Down Expand Up @@ -246,11 +253,36 @@ func (b *GocbV2Bucket) IsSupported(feature sgbucket.BucketStoreFeature) bool {
return isMinimumVersion(b.clusterCompatMajorVersion, b.clusterCompatMinorVersion, 7, 0)
case sgbucket.BucketStoreFeatureSystemCollections:
return isMinimumVersion(b.clusterCompatMajorVersion, b.clusterCompatMinorVersion, 7, 6)
case sgbucket.BucketStoreFeatureMobileXDCR:
return b.supportsHLV
default:
return false
}
}

// queryHLVBucketSetting sends request to server to check for enableCrossClusterVersioning bucket setting
func (b *GocbV2Bucket) queryHLVBucketSetting(ctx context.Context) error {
url := fmt.Sprintf("/pools/default/buckets/%s", b.GetName())
output, statusCode, err := b.MgmtRequest(ctx, http.MethodGet, url, "application/x-www-form-urlencoded", nil)
if err != nil || statusCode != http.StatusOK {
return fmt.Errorf("error executing query for mobile XDCR bucket setting, status code: %d error: %v output: %s", statusCode, err, string(output))
}

type bucket struct {
SupportsHLV *bool `json:"enableCrossClusterVersioning,omitempty"`
}
var bucketSettings bucket
err = JSONUnmarshal(output, &bucketSettings)
if err != nil {
return err
}
// In Server < 7.6.1 this field will not be present, but if it is not configured on the bucket, it will return false
if bucketSettings.SupportsHLV != nil {
b.supportsHLV = *bucketSettings.SupportsHLV
}
return nil
}

func (b *GocbV2Bucket) StartDCPFeed(ctx context.Context, args sgbucket.FeedArguments, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map) error {
groupID := ""
return StartGocbDCPFeed(ctx, b, b.Spec.BucketName, args, callback, dbStats, DCPMetadataStoreInMemory, groupID)
Expand Down Expand Up @@ -468,19 +500,21 @@ func (b *GocbV2Bucket) BucketName() string {
return b.GetName()
}

func (b *GocbV2Bucket) MgmtRequest(ctx context.Context, method, uri, contentType string, body io.Reader) (*http.Response, error) {
// MgmtRequest makes a request to the http couchbase management api. The uri is the non host part of the URL, such as /pools/default/buckets.
// This function will read the entire contents of the response and return the output bytes, the status code, and an error.
func (b *GocbV2Bucket) MgmtRequest(ctx context.Context, method, uri, contentType string, body io.Reader) ([]byte, int, error) {
if contentType == "" && body != nil {
return nil, errors.New("Content-type must be specified for non-null body.")
return nil, 0, errors.New("Content-type must be specified for non-null body.")
}

mgmtEp, err := GoCBBucketMgmtEndpoint(b)
if err != nil {
return nil, err
return nil, 0, err
}

req, err := http.NewRequest(method, mgmtEp+uri, body)
if err != nil {
return nil, err
return nil, 0, err
}

if contentType != "" {
Expand All @@ -491,7 +525,18 @@ func (b *GocbV2Bucket) MgmtRequest(ctx context.Context, method, uri, contentType
username, password, _ := b.Spec.Auth.GetCredentials()
req.SetBasicAuth(username, password)
}
return b.HttpClient(ctx).Do(req)
response, err := b.HttpClient(ctx).Do(req)
if err != nil {
return nil, response.StatusCode, err
}
defer func() { _ = response.Body.Close() }()

respBytes, err := io.ReadAll(response.Body)
if err != nil {
return nil, 0, err
}

return respBytes, response.StatusCode, nil
}

// This prevents Sync Gateway from overflowing gocb's pipeline
Expand Down Expand Up @@ -646,24 +691,19 @@ func (b *GocbV2Bucket) NamedDataStore(name sgbucket.DataStoreName) (sgbucket.Dat
// ServerMetrics returns all the metrics for couchbase server.
func (b *GocbV2Bucket) ServerMetrics(ctx context.Context) (map[string]*dto.MetricFamily, error) {
url := "/metrics/"
resp, err := b.MgmtRequest(ctx, http.MethodGet, url, "application/x-www-form-urlencoded", nil)
resp, statusCode, err := b.MgmtRequest(ctx, http.MethodGet, url, "application/x-www-form-urlencoded", nil)
if err != nil {
return nil, err
}
defer func() { _ = resp.Body.Close() }()

output, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("Could not read body from %s", url)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("Could not get metrics from %s. %s %s -> (%d) %s", b.GetName(), http.MethodGet, url, resp.StatusCode, output)
if statusCode != http.StatusOK {
return nil, fmt.Errorf("Could not get metrics from %s. %s %s -> (%d) %s", b.GetName(), http.MethodGet, url, statusCode, string(resp))
}

// filter duplicates from couchbase server or TextToMetricFamilies will fail MB-43772
lines := map[string]struct{}{}
filteredOutput := []string{}
for _, line := range strings.Split(string(output), "\n") {
for _, line := range strings.Split(string(resp), "\n") {
_, ok := lines[line]
if ok {
continue
Expand Down
41 changes: 41 additions & 0 deletions base/main_test_bucket_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ package base
import (
"context"
"fmt"
"net/http"
"net/url"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -62,6 +64,9 @@ type TestBucketPool struct {
skipCollections bool
useExistingBucket bool

// skipMobileXDCR may be true for older versions of Couchbase Server that don't support mobile XDCR enhancements
skipMobileXDCR bool

// when useDefaultScope is set, named collections are created in the default scope
useDefaultScope bool
}
Expand Down Expand Up @@ -125,6 +130,12 @@ func NewTestBucketPoolWithOptions(ctx context.Context, bucketReadierFunc TBPBuck
}
tbp.skipCollections = !useCollections

useMobileXDCR, err := tbp.cluster.mobileXDCRCompatible(ctx)
if err != nil {
tbp.Fatalf(ctx, "%s", err)
}
tbp.skipMobileXDCR = !useMobileXDCR

tbp.verbose.Set(tbpVerbose())

// Start up an async readier worker to process dirty buckets
Expand Down Expand Up @@ -433,6 +444,29 @@ func (tbp *TestBucketPool) emptyPreparedStatements(ctx context.Context, b Bucket
}
}

// setXDCRBucketSetting sets the bucket setting "enableCrossClusterVersioning" for mobile XDCR
func (tbp *TestBucketPool) setXDCRBucketSetting(ctx context.Context, bucket Bucket) {

tbp.Logf(ctx, "Setting crossClusterVersioningEnabled=true")

store, ok := AsCouchbaseBucketStore(bucket)
if !ok {
tbp.Fatalf(ctx, "unable to get server management endpoints. Underlying bucket type was not GoCBBucket")
}

posts := url.Values{}
posts.Add("enableCrossClusterVersioning", "true")

url := fmt.Sprintf("/pools/default/buckets/%s", store.GetName())
output, statusCode, err := store.MgmtRequest(ctx, http.MethodPost, url, "application/x-www-form-urlencoded", strings.NewReader(posts.Encode()))
if err != nil {
tbp.Fatalf(ctx, "request to mobile XDCR bucket setting failed, status code: %d error: %v output: %s", statusCode, err, string(output))
}
if statusCode != http.StatusOK {
tbp.Fatalf(ctx, "request to mobile XDCR bucket setting failed with status code, %d", statusCode)
}
}

// createCollections will create a set of test collections on the bucket, if enabled...
func (tbp *TestBucketPool) createCollections(ctx context.Context, bucket Bucket) {
// If we're able to use collections, the test bucket pool will also create N collections per bucket - rather than just getting the default collection ready.
Expand Down Expand Up @@ -495,6 +529,7 @@ func (tbp *TestBucketPool) createTestBuckets(numBuckets, bucketQuotaMB int, buck
if err != nil {
tbp.Fatalf(ctx, "Timed out trying to open new bucket: %v", err)
}

openBucketsLock.Lock()
openBuckets[bucketName] = bucket
openBucketsLock.Unlock()
Expand All @@ -503,6 +538,12 @@ func (tbp *TestBucketPool) createTestBuckets(numBuckets, bucketQuotaMB int, buck

tbp.emptyPreparedStatements(ctx, bucket)

if tbp.skipMobileXDCR {
tbp.Logf(ctx, "Not setting crossClusterVersioningEnabled")
} else {
tbp.setXDCRBucketSetting(ctx, bucket)
}

wg.Done()
}(testBucketName)
}
Expand Down
55 changes: 52 additions & 3 deletions base/main_test_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,18 @@ type tbpCluster interface {
supportsCollections() (bool, error)
supportsMobileRBAC() (bool, error)
isServerEnterprise() (bool, error)
mobileXDCRCompatible(ctx context.Context) (bool, error)
close(context.Context) error
}

// firstServerVersionToSupportMobileXDCR this is the first server version to support Mobile XDCR feature
var firstServerVersionToSupportMobileXDCR = &ComparableBuildVersion{
epoch: 0,
major: 7,
minor: 6,
patch: 1,
}

type clusterLogFunc func(ctx context.Context, format string, args ...interface{})

// newTestCluster returns a cluster based on the driver used by the defaultBucketSpec. Accepts a clusterLogFunc to support
Expand All @@ -39,9 +48,10 @@ func newTestCluster(ctx context.Context, server string, logger clusterLogFunc) t

// tbpClusterV2 implements the tbpCluster interface for a gocb v2 cluster
type tbpClusterV2 struct {
logger clusterLogFunc
server string // server address to connect to cluster
connstr string // connection string used to connect to the cluster
logger clusterLogFunc
server string // server address to connect to cluster
connstr string // connection string used to connect to the cluster
supportsHLV bool // Flag to indicate cluster supports Mobile XDCR
// cluster can be used to perform cluster-level operations (but not bucket-level operations)
cluster *gocb.Cluster
}
Expand Down Expand Up @@ -164,6 +174,11 @@ func (c *tbpClusterV2) openTestBucket(ctx context.Context, testBucketName tbpBuc
return nil, err
}

// add whether bucket is mobile XDCR ready to bucket object
if c.supportsHLV {
bucketFromSpec.supportsHLV = true
}

return bucketFromSpec, nil
}

Expand Down Expand Up @@ -203,3 +218,37 @@ func (c *tbpClusterV2) supportsMobileRBAC() (bool, error) {
}
return major >= 7 && minor >= 1, nil
}

// mobileXDCRCompatible checks if a cluster is mobile XDCR compatible, a cluster must be enterprise edition AND > 7.6.1
func (c *tbpClusterV2) mobileXDCRCompatible(ctx context.Context) (bool, error) {
enterprise, err := c.isServerEnterprise()
if err != nil {
return false, err
}
if !enterprise {
return false, nil
}

metadata, err := c.cluster.Internal().GetNodesMetadata(&gocb.GetNodesMetadataOptions{})
if err != nil {
return false, err
}

// take server version, server version will be the first 5 character of version string
// in the form of x.x.x
vrs := metadata[0].Version[:5]

// convert the above string into a comparable string
version, err := NewComparableBuildVersionFromString(vrs)
if err != nil {
return false, err
}

if !version.Less(firstServerVersionToSupportMobileXDCR) {
c.supportsHLV = true
return true, nil
}
c.logger(ctx, "cluster does not support mobile XDCR")

return false, nil
}
8 changes: 8 additions & 0 deletions base/util_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,14 @@ func TestX509LocalServer() (bool, string) {
return val, username
}

// TestSupportsMobileXDCR returns true to mobile XDCR bucket setting has been set to true for test bucket, false otherwise
func TestSupportsMobileXDCR() bool {
if GTestBucketPool.skipMobileXDCR {
return false
}
return true
}

// Should tests try to drop GSI indexes before flushing buckets?
// See SG #3422
func TestsShouldDropIndexes() bool {
Expand Down
Loading

0 comments on commit f396187

Please sign in to comment.