Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.1.4 fix] CBG-3777 make sure cbgt always sets kv_pool_size=1 #6681

Merged
merged 1 commit into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions base/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ func (p *GoCBConnStringParams) FillDefaults() {
}
}

// GetGoCBConnString builds a gocb connection string based on BucketSpec.Server.
func (spec *BucketSpec) GetGoCBConnString(params *GoCBConnStringParams) (string, error) {
// GetGoCBConnString builds a gocb connection string based on BucketSpec.Server. params defines the defaults that will be set for dcp_buffer_size, kv_buffer_size, kv_pool_size if they are non defined as nonzero. forceKvPoolSize will override the vlaue in BucketSpec.Server and the params. Both arguments are optional.
func (spec *BucketSpec) GetGoCBConnString(params *GoCBConnStringParams, forceKvPoolSize *int) (string, error) {
if params == nil {
params = &GoCBConnStringParams{}
}
Expand All @@ -196,7 +196,10 @@ func (spec *BucketSpec) GetGoCBConnString(params *GoCBConnStringParams) (string,

// Add kv_pool_size as used in both GoCB versions
poolSizeFromConnStr := asValues.Get("kv_pool_size")
if poolSizeFromConnStr == "" {
if forceKvPoolSize != nil {
asValues.Set("kv_pool_size", strconv.Itoa(*forceKvPoolSize))
spec.KvPoolSize = *forceKvPoolSize
} else if poolSizeFromConnStr == "" {
asValues.Set("kv_pool_size", strconv.Itoa(params.KVPoolSize))
spec.KvPoolSize = params.KVPoolSize
} else {
Expand Down
2 changes: 1 addition & 1 deletion base/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestGetGoCBConnString(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
actualConnStr, err := test.bucketSpec.GetGoCBConnString(nil)
actualConnStr, err := test.bucketSpec.GetGoCBConnString(nil, nil)
assert.NoError(t, err, "Unexpected error creating connection string for bucket spec")
assert.Equal(t, test.expectedConnStr, actualConnStr)
})
Expand Down
2 changes: 1 addition & 1 deletion base/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
// GetGoCBv2Bucket opens a connection to the Couchbase cluster and returns a *GocbV2Bucket for the specified BucketSpec.
func GetGoCBv2Bucket(ctx context.Context, spec BucketSpec) (*GocbV2Bucket, error) {

connString, err := spec.GetGoCBConnString(nil)
connString, err := spec.GetGoCBConnString(nil, nil)
if err != nil {
WarnfCtx(ctx, "Unable to parse server value: %s error: %v", SD(spec.Server), err)
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions base/dcp_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,9 @@ func (dc *DCPClient) initAgent(spec BucketSpec) error {
KVBufferSize: spec.KvBufferSize,
DCPBufferSize: spec.DcpBuffer,
}
connStr, err := spec.GetGoCBConnString(defaultValues)

// Force poolsize to 1, multiple clients results in DCP naming collision
connStr, err := spec.GetGoCBConnString(defaultValues, IntPtr(GoCBPoolSizeDCP))
if err != nil {
return err
}
Expand All @@ -337,8 +339,6 @@ func (dc *DCPClient) initAgent(spec BucketSpec) error {
return err
}

// Force poolsize to 1, multiple clients results in DCP naming collision
agentConfig.KVConfig.PoolSize = 1
agentConfig.BucketName = spec.BucketName
agentConfig.DCPConfig.AgentPriority = dc.agentPriority
agentConfig.SecurityConfig.Auth = auth
Expand Down
2 changes: 1 addition & 1 deletion base/dcp_sharded.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func initCBGTManager(ctx context.Context, bucket Bucket, spec BucketSpec, cfgSG
DCPBufferSize: spec.DcpBuffer,
}

serverURL, err := spec.GetGoCBConnString(defaultValues)
serverURL, err := spec.GetGoCBConnString(defaultValues, IntPtr(GoCBPoolSizeDCP))
if err != nil {
return nil, err
}
Expand Down
16 changes: 16 additions & 0 deletions base/dcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,3 +542,19 @@ func legacyFeedParams(spec BucketSpec) (string, error) {
}
return string(paramBytes), nil
}

func TestCBGTKvPoolSize(t *testing.T) {
ctx := TestCtx(t)
bucket := GetTestBucket(t)
defer bucket.Close(ctx)

spec := bucket.BucketSpec
spec.Server += "&kv_pool_size=8"

cfg, err := NewCbgtCfgMem()
require.NoError(t, err)
cbgtContext, err := initCBGTManager(ctx, bucket, spec, cfg, t.Name(), "fakeDb")
assert.NoError(t, err)
defer cbgtContext.Stop()
require.Contains(t, cbgtContext.Manager.Server(), "kv_pool_size=1")
}
2 changes: 1 addition & 1 deletion base/main_test_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func initV2Cluster(ctx context.Context, server string) *gocb.Cluster {
BucketOpTimeout: &testClusterTimeout,
}

connStr, err := spec.GetGoCBConnString(nil)
connStr, err := spec.GetGoCBConnString(nil, nil)
if err != nil {
FatalfCtx(ctx, "error getting connection string: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion rest/server_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ func (sc *ServerContext) _getOrAddDatabaseFromConfig(ctx context.Context, config
if spec.DcpBuffer != 0 {
params.DCPBufferSize = spec.DcpBuffer
}
connStr, err := spec.GetGoCBConnString(params)
connStr, err := spec.GetGoCBConnString(params, nil)
if err != nil {
return nil, err
}
Expand Down
Loading