Skip to content

Commit

Permalink
sqm later
Browse files Browse the repository at this point in the history
  • Loading branch information
khaf committed Dec 8, 2024
1 parent d8dd34b commit 559a13c
Show file tree
Hide file tree
Showing 41 changed files with 3,672 additions and 136 deletions.
26 changes: 26 additions & 0 deletions abort_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2014-2024 Aerospike, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package aerospike

// Multi-record transaction (MRT) abort status code.
type AbortStatus string

const (
AbortStatusOk AbortStatus = "Abort succeeded"
AbortStatusAlreadyCommitted AbortStatus = "Already committed"
AbortStatusAlreadyAborted AbortStatus = "Already aborted"
AbortStatusRollBackAbandoned AbortStatus = "MRT client roll back abandoned. Server will eventually abort the MRT."
AbortStatusCloseAbandoned AbortStatus = "MRT has been rolled back, but MRT client close was abandoned. Server will eventually close the MRT."
)
13 changes: 13 additions & 0 deletions batch_attr.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type batchAttr struct {
readAttr int
writeAttr int
infoAttr int
txnAttr int
expiration uint32
generation uint32
hasWrite bool
Expand Down Expand Up @@ -291,3 +292,15 @@ func (ba *batchAttr) setBatchDelete(dp *BatchDeletePolicy) {
ba.infoAttr |= _INFO3_COMMIT_MASTER
}
}

func (ba *batchAttr) setTxn(attr int) {
ba.filterExp = nil
ba.readAttr = 0
ba.writeAttr = _INFO2_WRITE | _INFO2_RESPOND_ALL_OPS | _INFO2_DURABLE_DELETE
ba.infoAttr = 0
ba.txnAttr = attr
ba.expiration = 0
ba.generation = 0
ba.hasWrite = true
ba.sendKey = false
}
28 changes: 25 additions & 3 deletions batch_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ type batcher interface {
generateBatchNodes(*Cluster) ([]*batchNode, Error)
setSequence(int, int)

executeSingle(clientIfc) Error
// executeSingle(clientIfc) Error
setInDoubt(batcher)
inDoubt()
}

type clientIfc interface {
Expand All @@ -52,6 +54,20 @@ type batchCommand struct {
filteredOutCnt int
}

func (cmd *batchCommand) setInDoubt(ifc batcher) {
// Set error/inDoubt for keys associated this batch command when
// the command was not retried and split. If a split retry occurred,
// those new subcommands have already set inDoubt on the affected
// subset of keys.
if !cmd.splitRetry {
ifc.inDoubt()
}
}

func (cmd *batchCommand) inDoubt() {
// do nothing by defaut
}

func (cmd *batchCommand) prepareRetry(ifc command, isTimeout bool) bool {
if !(cmd.policy.ReplicaPolicy == SEQUENCE || cmd.policy.ReplicaPolicy == PREFER_RACK) {
// Perform regular retry to same node.
Expand Down Expand Up @@ -79,12 +95,14 @@ func (cmd *batchCommand) retryBatch(ifc batcher, cluster *Cluster, deadline time
return false, nil
}

cmd.splitRetry = true

// Run batch requests sequentially in same thread.
var ferr Error
for _, batchNode := range batchNodes {
command := ifc.cloneBatchCommand(batchNode)
command.setSequence(cmd.sequenceAP, cmd.sequenceSC)
if err := command.executeAt(command, cmd.policy.GetBasePolicy(), deadline, iteration); err != nil {
if err := command.executeIter(command, iteration); err != nil {
ferr = chainErrors(err, ferr)
if !cmd.policy.AllowPartialResults {
return false, ferr
Expand All @@ -108,7 +126,11 @@ func (cmd *batchCommand) commandType() commandType {
}

func (cmd *batchCommand) Execute() Error {
return cmd.execute(cmd)
err := cmd.execute(cmd)
if err != nil {
cmd.setInDoubt(cmd)
}
return err
}

func (cmd *batchCommand) filteredOut() int {
Expand Down
2 changes: 1 addition & 1 deletion batch_command_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (cmd *batchCommandDelete) parseRecordResults(ifc command, receiveSize int)
batchIndex := int(Buffer.BytesToUint32(cmd.dataBuffer, 14))
fieldCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 18))
opCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 20))
err := cmd.skipKey(fieldCount)
err := cmd.parseFieldsWrite(resultCode, fieldCount, cmd.keys[batchIndex])
if err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion batch_command_exists.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (cmd *batchCommandExists) parseRecordResults(ifc command, receiveSize int)
return false, newCustomNodeError(cmd.node, types.PARSE_ERROR, "Received bins that were not requested!")
}

err := cmd.skipKey(fieldCount)
err := cmd.parseFieldsRead(fieldCount, cmd.keys[batchIndex])
if err != nil {
return false, err
}
Expand Down
6 changes: 3 additions & 3 deletions batch_command_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type batchCommandGet struct {
objectsFound []bool
}

type batchObjectParsetIfc interface {
type batchObjectParserIfc interface {
buf() []byte
readBytes(int) Error
object(int) *reflect.Value
Expand All @@ -46,7 +46,7 @@ type batchObjectParsetIfc interface {
// this method uses reflection.
// Will not be set if performance flag is passed for the build.
var batchObjectParser func(
cmd batchObjectParsetIfc,
cmd batchObjectParserIfc,
offset int,
opCount int,
fieldCount int,
Expand Down Expand Up @@ -140,7 +140,7 @@ func (cmd *batchCommandGet) parseRecordResults(ifc command, receiveSize int) (bo
batchIndex := int(Buffer.BytesToUint32(cmd.dataBuffer, 14))
fieldCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 18))
opCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 20))
err := cmd.skipKey(fieldCount)
err := cmd.parseFieldsRead(fieldCount, cmd.keys[batchIndex])
if err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion batch_command_get_reflect.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func init() {
}

func parseBatchObject(
cmd batchObjectParsetIfc,
cmd batchObjectParserIfc,
offset int,
opCount int,
fieldCount int,
Expand Down
2 changes: 1 addition & 1 deletion batch_command_operate.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (cmd *batchCommandOperate) parseRecordResults(ifc command, receiveSize int)
fieldCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 18))
opCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 20))

err := cmd.skipKey(fieldCount)
err := cmd.parseFieldsBatch(resultCode, fieldCount, cmd.records[batchIndex])
if err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion batch_command_udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (cmd *batchCommandUDF) parseRecordResults(ifc command, receiveSize int) (bo
batchIndex := int(Buffer.BytesToUint32(cmd.dataBuffer, 14))
fieldCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 18))
opCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 20))
err := cmd.skipKey(fieldCount)
err := cmd.parseFieldsWrite(resultCode, fieldCount, cmd.keys[batchIndex])
if err != nil {
return false, err
}
Expand Down
17 changes: 17 additions & 0 deletions batch_executer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,20 @@ func (clnt *Client) batchExecute(policy *BatchPolicy, batchNodes []*batchNode, c

return filteredOut, errs
}

// batchExecuteSimple Uses werrGroup to run commands using multiple goroutines,
// and waits for their return
func (clnt *Client) batchExecuteSimple(policy *BatchPolicy, cmds []command) Error {
maxConcurrentNodes := policy.ConcurrentNodes
if maxConcurrentNodes <= 0 {
maxConcurrentNodes = len(cmds)
}

// we need this list to count the number of filtered out records
weg := newWeightedErrGroup(maxConcurrentNodes)
for _, cmd := range cmds {
weg.execute(cmd)
}

return weg.wait()
}
41 changes: 41 additions & 0 deletions batch_offsets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2014-2024 Aerospike, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package aerospike

type BatchOffsets interface {
size() int
get(int) int
}

// enforce the interface
var _ BatchOffsets = &batchOffsetsNative{}

type batchOffsetsNative struct {
offsets []int
}

func newBatchOffsetsNative(batch *batchNode) *batchOffsetsNative {
return &batchOffsetsNative{
offsets: batch.offsets,
}
}

func (bon *batchOffsetsNative) size() int {
return len(bon.offsets)
}

func (bon *batchOffsetsNative) get(i int) int {
return bon.offsets[i]
}
6 changes: 6 additions & 0 deletions bytes_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ func (buf *bufferEx) WriteInt16LittleEndian(num uint16) int {
return 2
}

func (buf *bufferEx) WriteInt32LittleEndian(num uint32) int {
binary.LittleEndian.PutUint32(buf.dataBuffer[buf.dataOffset:buf.dataOffset+4], num)
buf.dataOffset += 4
return 4
}

func (buf *bufferEx) WriteInt64LittleEndian(num uint64) int {
binary.LittleEndian.PutUint64(buf.dataBuffer[buf.dataOffset:buf.dataOffset+8], num)
buf.dataOffset += 8
Expand Down
Loading

0 comments on commit 559a13c

Please sign in to comment.