Skip to content

Commit

Permalink
Resolve memory leak caused by circular reference in client finalizer
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed Jul 8, 2024
1 parent 8c8c1a6 commit 873c199
Show file tree
Hide file tree
Showing 12 changed files with 79 additions and 48 deletions.
8 changes: 7 additions & 1 deletion batch_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,18 @@ type batcher interface {
generateBatchNodes(*Cluster) ([]*batchNode, Error)
setSequence(int, int)

executeSingle(*Client) Error
executeSingle(clientIfc) Error
}

type clientIfc interface {
ClientIfc
execute(policy *WritePolicy, key *Key, packageName string, functionName string, args ...Value) (*Record, Error)
}

type batchCommand struct {
baseMultiCommand

client clientIfc
batch *batchNode
policy *BatchPolicy
sequenceAP int
Expand Down
12 changes: 9 additions & 3 deletions batch_command_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,22 @@ type batchCommandDelete struct {
}

func newBatchCommandDelete(
node *Node,
client clientIfc,
batch *batchNode,
policy *BatchPolicy,
batchDeletePolicy *BatchDeletePolicy,
keys []*Key,
records []*BatchRecord,
attr *batchAttr,
) *batchCommandDelete {
var node *Node
if batch != nil {
node = batch.Node
}

res := &batchCommandDelete{
batchCommand: batchCommand{
client: client,
baseMultiCommand: *newMultiCommand(node, nil, false),
policy: policy,
batch: batch,
Expand Down Expand Up @@ -168,7 +174,7 @@ func (cmd *batchCommandDelete) transactionType() transactionType {
return ttBatchWrite
}

func (cmd *batchCommandDelete) executeSingle(client *Client) Error {
func (cmd *batchCommandDelete) executeSingle(client clientIfc) Error {
policy := cmd.batchDeletePolicy.toWritePolicy(cmd.policy)
for i, key := range cmd.keys {
res, err := client.Operate(policy, key, DeleteOp())
Expand Down Expand Up @@ -197,7 +203,7 @@ func (cmd *batchCommandDelete) executeSingle(client *Client) Error {

func (cmd *batchCommandDelete) Execute() Error {
if len(cmd.keys) == 1 {
return cmd.executeSingle(cmd.node.cluster.client)
return cmd.executeSingle(cmd.client)
}
return cmd.execute(cmd)
}
Expand Down
12 changes: 9 additions & 3 deletions batch_command_exists.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,20 @@ type batchCommandExists struct {
}

func newBatchCommandExists(
node *Node,
client clientIfc,
batch *batchNode,
policy *BatchPolicy,
keys []*Key,
existsArray []bool,
) *batchCommandExists {
var node *Node
if batch != nil {
node = batch.Node
}

res := &batchCommandExists{
batchCommand: batchCommand{
client: client,
baseMultiCommand: *newMultiCommand(node, nil, false),
policy: policy,
batch: batch,
Expand Down Expand Up @@ -110,7 +116,7 @@ func (cmd *batchCommandExists) transactionType() transactionType {
return ttBatchRead
}

func (cmd *batchCommandExists) executeSingle(client *Client) Error {
func (cmd *batchCommandExists) executeSingle(client clientIfc) Error {
var err Error
for _, offset := range cmd.batch.offsets {
cmd.existsArray[offset], err = client.Exists(&cmd.policy.BasePolicy, cmd.keys[offset])
Expand All @@ -136,7 +142,7 @@ func (cmd *batchCommandExists) executeSingle(client *Client) Error {

func (cmd *batchCommandExists) Execute() Error {
if len(cmd.batch.offsets) == 1 {
return cmd.executeSingle(cmd.node.cluster.client)
return cmd.executeSingle(cmd.client)
}
return cmd.execute(cmd)
}
Expand Down
13 changes: 9 additions & 4 deletions batch_command_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ var batchObjectParser func(
) Error

func newBatchCommandGet(
node *Node,
client clientIfc,
batch *batchNode,
policy *BatchPolicy,
keys []*Key,
Expand All @@ -65,8 +65,14 @@ func newBatchCommandGet(
readAttr int,
isOperation bool,
) *batchCommandGet {
var node *Node
if batch != nil {
node = batch.Node
}

res := &batchCommandGet{
batchCommand: batchCommand{
client: client,
baseMultiCommand: *newMultiCommand(node, nil, isOperation),
policy: policy,
batch: batch,
Expand Down Expand Up @@ -158,7 +164,6 @@ func (cmd *batchCommandGet) parseRecordResults(ifc command, receiveSize int) (bo
cmd.objectsFound[batchIndex] = true
if err := batchObjectParser(cmd, batchIndex, opCount, fieldCount, generation, expiration); err != nil {
return false, err

}
}
}
Expand Down Expand Up @@ -216,7 +221,7 @@ func (cmd *batchCommandGet) transactionType() transactionType {
return ttBatchRead
}

func (cmd *batchCommandGet) executeSingle(client *Client) Error {
func (cmd *batchCommandGet) executeSingle(client clientIfc) Error {
for _, offset := range cmd.batch.offsets {
var err Error
if len(cmd.ops) > 0 {
Expand Down Expand Up @@ -254,7 +259,7 @@ func (cmd *batchCommandGet) executeSingle(client *Client) Error {

func (cmd *batchCommandGet) Execute() Error {
if cmd.objects == nil && len(cmd.batch.offsets) == 1 {
return cmd.executeSingle(cmd.node.cluster.client)
return cmd.executeSingle(cmd.client)
}
return cmd.execute(cmd)
}
Expand Down
17 changes: 9 additions & 8 deletions batch_command_operate.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

type batchCommandOperate struct {
batchCommand
client ClientIfc

attr *batchAttr
records []BatchRecordIfc
Expand All @@ -36,19 +35,23 @@ type batchCommandOperate struct {
}

func newBatchCommandOperate(
client ClientIfc,
node *Node,
client clientIfc,
batch *batchNode,
policy *BatchPolicy,
records []BatchRecordIfc,
) *batchCommandOperate {
var node *Node
if batch != nil {
node = batch.Node
}

res := &batchCommandOperate{
batchCommand: batchCommand{
client: client,
baseMultiCommand: *newMultiCommand(node, nil, true),
policy: policy,
batch: batch,
},
client: client,
records: records,
}
return res
Expand Down Expand Up @@ -228,7 +231,7 @@ func (cmd *batchCommandOperate) parseRecord(key *Key, opCount int, generation, e
return newRecord(cmd.node, key, bins, generation, expiration), nil
}

func (cmd *batchCommandOperate) executeSingle(client *Client) Error {
func (cmd *batchCommandOperate) executeSingle(client clientIfc) Error {
var res *Record
var err Error
for _, br := range cmd.records {
Expand All @@ -250,11 +253,9 @@ func (cmd *batchCommandOperate) executeSingle(client *Client) Error {
policy := cmd.client.getUsableBatchWritePolicy(br.Policy).toWritePolicy(cmd.policy)
policy.RespondPerEachOp = true
res, err = client.Operate(policy, br.Key, br.Ops...)
br.setRecord(res)
case *BatchDelete:
policy := cmd.client.getUsableBatchDeletePolicy(br.Policy).toWritePolicy(cmd.policy)
res, err = client.Operate(policy, br.Key, DeleteOp())
br.setRecord(res)
case *BatchUDF:
policy := cmd.client.getUsableBatchUDFPolicy(br.Policy).toWritePolicy(cmd.policy)
policy.RespondPerEachOp = true
Expand Down Expand Up @@ -287,7 +288,7 @@ func (cmd *batchCommandOperate) executeSingle(client *Client) Error {

func (cmd *batchCommandOperate) Execute() Error {
if cmd.objects == nil && len(cmd.records) == 1 {
return cmd.executeSingle(cmd.node.cluster.client)
return cmd.executeSingle(cmd.client)
}
return cmd.execute(cmd)
}
Expand Down
25 changes: 16 additions & 9 deletions batch_command_udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type batchCommandUDF struct {
}

func newBatchCommandUDF(
node *Node,
client clientIfc,
batch *batchNode,
policy *BatchPolicy,
batchUDFPolicy *BatchUDFPolicy,
Expand All @@ -43,18 +43,25 @@ func newBatchCommandUDF(
records []*BatchRecord,
attr *batchAttr,
) *batchCommandUDF {
var node *Node
if batch != nil {
node = batch.Node
}

res := &batchCommandUDF{
batchCommand: batchCommand{
client: client,
baseMultiCommand: *newMultiCommand(node, nil, false),
policy: policy,
batch: batch,
},
keys: keys,
records: records,
packageName: packageName,
functionName: functionName,
args: args,
attr: attr,
batchUDFPolicy: batchUDFPolicy,
keys: keys,
records: records,
packageName: packageName,
functionName: functionName,
args: args,
attr: attr,
}
return res
}
Expand Down Expand Up @@ -176,7 +183,7 @@ func (cmd *batchCommandUDF) isRead() bool {
return !cmd.attr.hasWrite
}

func (cmd *batchCommandUDF) executeSingle(client *Client) Error {
func (cmd *batchCommandUDF) executeSingle(client clientIfc) Error {
for i, key := range cmd.keys {
policy := cmd.batchUDFPolicy.toWritePolicy(cmd.policy)
policy.RespondPerEachOp = true
Expand Down Expand Up @@ -206,7 +213,7 @@ func (cmd *batchCommandUDF) executeSingle(client *Client) Error {

func (cmd *batchCommandUDF) Execute() Error {
if len(cmd.keys) == 1 {
return cmd.executeSingle(cmd.node.cluster.client)
return cmd.executeSingle(cmd.client)
}
return cmd.execute(cmd)
}
Expand Down
6 changes: 4 additions & 2 deletions batch_index_command_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type batchIndexCommandGet struct {
}

func newBatchIndexCommandGet(
client clientIfc,
batch *batchNode,
policy *BatchPolicy,
records []*BatchRead,
Expand All @@ -34,6 +35,7 @@ func newBatchIndexCommandGet(
res := &batchIndexCommandGet{
batchCommandGet{
batchCommand: batchCommand{
client: client,
baseMultiCommand: *newMultiCommand(node, nil, isOperation),
policy: policy,
batch: batch,
Expand All @@ -59,12 +61,12 @@ func (cmd *batchIndexCommandGet) writeBuffer(ifc command) Error {

func (cmd *batchIndexCommandGet) Execute() Error {
if len(cmd.batch.offsets) == 1 {
return cmd.executeSingle(cmd.node.cluster.client)
return cmd.executeSingle(cmd.client)
}
return cmd.execute(cmd)
}

func (cmd *batchIndexCommandGet) executeSingle(client *Client) Error {
func (cmd *batchIndexCommandGet) executeSingle(client clientIfc) Error {
for i, br := range cmd.indexRecords {
var ops []*Operation
if br.headerOnly() {
Expand Down
20 changes: 8 additions & 12 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,8 @@ func NewClientWithPolicyAndHost(policy *ClientPolicy, hosts ...*Host) (*Client,
DefaultInfoPolicy: NewInfoPolicy(),
}

// back reference especially used in batch commands
cluster.client = client

runtime.SetFinalizer(client, clientFinalizer)
return client, err

}

//-------------------------------------------------------
Expand Down Expand Up @@ -453,7 +449,7 @@ func (clnt *Client) BatchExists(policy *BatchPolicy, keys []*Key) ([]bool, Error
}

// pass nil to make sure it will be cloned and prepared
cmd := newBatchCommandExists(nil, nil, policy, keys, existsArray)
cmd := newBatchCommandExists(clnt, nil, policy, keys, existsArray)
filteredOut, err := clnt.batchExecute(policy, batchNodes, cmd)
if filteredOut > 0 {
err = chainErrors(ErrFilteredOut.err(), err)
Expand Down Expand Up @@ -526,7 +522,7 @@ func (clnt *Client) BatchGet(policy *BatchPolicy, keys []*Key, binNames ...strin
return nil, err
}

cmd := newBatchCommandGet(nil, nil, policy, keys, binNames, nil, records, _INFO1_READ, false)
cmd := newBatchCommandGet(clnt, nil, policy, keys, binNames, nil, records, _INFO1_READ, false)
filteredOut, err := clnt.batchExecute(policy, batchNodes, cmd)
if err != nil && !policy.AllowPartialResults {
return nil, err
Expand Down Expand Up @@ -556,7 +552,7 @@ func (clnt *Client) BatchGetOperate(policy *BatchPolicy, keys []*Key, ops ...*Op
return nil, err
}

cmd := newBatchCommandGet(nil, nil, policy, keys, nil, ops, records, _INFO1_READ, true)
cmd := newBatchCommandGet(clnt, nil, policy, keys, nil, ops, records, _INFO1_READ, true)
filteredOut, err := clnt.batchExecute(policy, batchNodes, cmd)
if err != nil && !policy.AllowPartialResults {
return nil, err
Expand All @@ -578,7 +574,7 @@ func (clnt *Client) BatchGetOperate(policy *BatchPolicy, keys []*Key, ops ...*Op
func (clnt *Client) BatchGetComplex(policy *BatchPolicy, records []*BatchRead) Error {
policy = clnt.getUsableBatchPolicy(policy)

cmd := newBatchIndexCommandGet(nil, policy, records, true)
cmd := newBatchIndexCommandGet(clnt, nil, policy, records, true)

batchNodes, err := newBatchIndexNodeList(clnt.cluster, policy, records)
if err != nil {
Expand Down Expand Up @@ -614,7 +610,7 @@ func (clnt *Client) BatchGetHeader(policy *BatchPolicy, keys []*Key) ([]*Record,
return nil, err
}

cmd := newBatchCommandGet(nil, nil, policy, keys, nil, nil, records, _INFO1_READ|_INFO1_NOBINDATA, false)
cmd := newBatchCommandGet(clnt, nil, policy, keys, nil, nil, records, _INFO1_READ|_INFO1_NOBINDATA, false)
filteredOut, err := clnt.batchExecute(policy, batchNodes, cmd)
if err != nil && !policy.AllowPartialResults {
return nil, err
Expand Down Expand Up @@ -650,7 +646,7 @@ func (clnt *Client) BatchDelete(policy *BatchPolicy, deletePolicy *BatchDeletePo
return nil, err
}

cmd := newBatchCommandDelete(nil, nil, policy, deletePolicy, keys, records, attr)
cmd := newBatchCommandDelete(clnt, nil, policy, deletePolicy, keys, records, attr)
_, err = clnt.batchExecute(policy, batchNodes, cmd)
return records, err
}
Expand All @@ -670,7 +666,7 @@ func (clnt *Client) BatchOperate(policy *BatchPolicy, records []BatchRecordIfc)
return err
}

cmd := newBatchCommandOperate(clnt, nil, nil, policy, records)
cmd := newBatchCommandOperate(clnt, nil, policy, records)
_, err = clnt.batchExecute(policy, batchNodes, cmd)
return err
}
Expand Down Expand Up @@ -701,7 +697,7 @@ func (clnt *Client) BatchExecute(policy *BatchPolicy, udfPolicy *BatchUDFPolicy,
return nil, err
}

cmd := newBatchCommandUDF(nil, nil, policy, udfPolicy, keys, packageName, functionName, args, records, attr)
cmd := newBatchCommandUDF(clnt, nil, policy, udfPolicy, keys, packageName, functionName, args, records, attr)
_, err = clnt.batchExecute(policy, batchNodes, cmd)
return records, err
}
Expand Down
Loading

0 comments on commit 873c199

Please sign in to comment.