Skip to content

Commit

Permalink
Merge branch 'v7' into mrt
Browse files Browse the repository at this point in the history
  • Loading branch information
khaf committed Dec 11, 2024
2 parents 559a13c + ab899d5 commit f810971
Show file tree
Hide file tree
Showing 53 changed files with 1,979 additions and 1,454 deletions.
23 changes: 22 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,26 @@
# Change History

## November 29 2024: v7.7.3

Minor fix release.

- **Fixes**
- [CLIENT-3196] Parse nil keys properly in scan/query operations.

## November 1 2024: v7.7.2

Minor fix release.

- **Fixes**
- [CLIENT-3156] Fix an issue where rack policy always returns the master node. Resolves #455

## September 18 2024: v7.7.1

Hot Fix release. This version fixes a major bug introduced in v7.7.0. You should use this release instead.

- **Fixes**
- [CLIENT-3122] Fix `nil` dereference in the tend logic.

## September 13 2024: v7.7.0

Minor improvement release.
Expand Down Expand Up @@ -51,7 +72,7 @@
- **Improvements**
- [CLIENT-2997] Scans should work in a mixed cluster of v5.7 and v6.4 server nodes.
- [CLIENT-3020] Change `ReadModeSC` doc from server to client perspective.

- **Fixes**
- [CLIENT-3019] Prevent Goroutine leak in `AuthInterceptor` for the Proxy Client.

Expand Down
2 changes: 1 addition & 1 deletion abort_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package aerospike
type AbortStatus string

const (
AbortStatusOk AbortStatus = "Abort succeeded"
AbortStatusOK AbortStatus = "Abort succeeded"
AbortStatusAlreadyCommitted AbortStatus = "Already committed"
AbortStatusAlreadyAborted AbortStatus = "Already aborted"
AbortStatusRollBackAbandoned AbortStatus = "MRT client roll back abandoned. Server will eventually abort the MRT."
Expand Down
26 changes: 14 additions & 12 deletions aerospike_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,19 @@ import (
)

var (
hosts = flag.String("hosts", "", "Comma separated Aerospike server seed hostnames or IP addresses and ports. eg: s1:3000,s2:3000,s3:3000")
host = flag.String("h", "127.0.0.1", "Aerospike server seed hostnames or IP addresses")
nativeHosts = flag.String("nh", "127.0.0.1:3000", "Native Aerospike server seed hostnames or IP addresses, used in tests for GRPC to support unsupported API")
port = flag.Int("p", 3000, "Aerospike server seed hostname or IP address port number.")
user = flag.String("U", "", "Username.")
password = flag.String("P", "", "Password.")
authMode = flag.String("A", "internal", "Authentication mode: internal | external")
useReplicas = flag.Bool("use-replicas", false, "Aerospike will use replicas as well as master partitions.")
debug = flag.Bool("debug", false, "Will set the logging level to DEBUG.")
proxy = flag.Bool("proxy", false, "Will use Proxy Client.")
dbaas = flag.Bool("dbaas", false, "Will run the tests for a dbaas environment.")
namespace = flag.String("n", "test", "Namespace")
hosts = flag.String("hosts", "", "Comma separated Aerospike server seed hostnames or IP addresses and ports. eg: s1:3000,s2:3000,s3:3000")
host = flag.String("h", "127.0.0.1", "Aerospike server seed hostnames or IP addresses")
nativeHosts = flag.String("nh", "127.0.0.1:3000", "Native Aerospike server seed hostnames or IP addresses, used in tests for GRPC to support unsupported API")
port = flag.Int("p", 3000, "Aerospike server seed hostname or IP address port number.")
user = flag.String("U", "", "Username.")
password = flag.String("P", "", "Password.")
authMode = flag.String("A", "internal", "Authentication mode: internal | external")
useReplicas = flag.Bool("use-replicas", false, "Aerospike will use replicas as well as master partitions.")
debug = flag.Bool("debug", false, "Will set the logging level to DEBUG.")
proxy = flag.Bool("proxy", false, "Will use Proxy Client.")
dbaas = flag.Bool("dbaas", false, "Will run the tests for a dbaas environment.")
namespace = flag.String("n", "test", "Namespace")
UseServicesAlternate = flag.Bool("use-services-alternate", false, "Will set ClientPolicy.UseServicesAlternate to true.")

certFile = flag.String("cert_file", "", "Certificate file name.")
keyFile = flag.String("key_file", "", "Key file name.")
Expand Down Expand Up @@ -100,6 +101,7 @@ func initTestVars() {
// setup TLS
tlsConfig = initTLS()
clientPolicy.TlsConfig = tlsConfig
clientPolicy.UseServicesAlternate = *UseServicesAlternate

var dbHosts []*as.Host

Expand Down
100 changes: 100 additions & 0 deletions base_read_command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2014-2022 Aerospike, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package aerospike

import (
"reflect"

"github.com/aerospike/aerospike-client-go/v7/types"
)

type baseReadCommand struct {
singleCommand

policy *BasePolicy
binNames []string
record *Record

// pointer to the object that's going to be unmarshalled
object *reflect.Value

replicaSequence int
}

// this method uses reflection.
// Will not be set if performance flag is passed for the build.
var objectParser func(
brc *baseReadCommand,
opCount int,
fieldCount int,
generation uint32,
expiration uint32,
) Error

func newBaseReadCommand(cluster *Cluster, policy *BasePolicy, key *Key) (baseReadCommand, Error) {
var partition *Partition
var err Error
if cluster != nil {
partition, err = PartitionForRead(cluster, policy, key)
if err != nil {
return baseReadCommand{}, err
}
}

return baseReadCommand{
singleCommand: newSingleCommand(cluster, key, partition),
policy: policy,
}, nil
}

func (cmd *baseReadCommand) getPolicy(ifc command) Policy {
return cmd.policy
}

func (cmd *baseReadCommand) writeBuffer(ifc command) Error {
panic(unreachable)
}

func (cmd *baseReadCommand) getNode(ifc command) (*Node, Error) {
return cmd.partition.GetNodeRead(cmd.cluster)
}

func (cmd *baseReadCommand) prepareRetry(ifc command, isTimeout bool) bool {
cmd.partition.PrepareRetryRead(isTimeout)
return true
}

func (cmd *baseReadCommand) parseResult(ifc command, conn *Connection) Error {
panic(unreachable)
}

func (cmd *baseReadCommand) handleUdfError(resultCode types.ResultCode) Error {
if ret, exists := cmd.record.Bins["FAILURE"]; exists {
return newError(resultCode, ret.(string))
}
return newError(resultCode)
}

func (cmd *baseReadCommand) GetRecord() *Record {
return cmd.record
}

func (cmd *baseReadCommand) Execute() Error {
panic(unreachable)
}

func (cmd *baseReadCommand) commandType() commandType {
return ttGet
}
102 changes: 102 additions & 0 deletions base_write_command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2014-2022 Aerospike, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package aerospike

import "github.com/aerospike/aerospike-client-go/v7/types"

// guarantee baseWriteCommand implements command interface
var _ command = &baseWriteCommand{}

type baseWriteCommand struct {
singleCommand

policy *WritePolicy
}

func newBaseWriteCommand(
cluster *Cluster,
policy *WritePolicy,
key *Key,
) (baseWriteCommand, Error) {

var partition *Partition
var err Error
if cluster != nil {
partition, err = PartitionForWrite(cluster, &policy.BasePolicy, key)
if err != nil {
return baseWriteCommand{}, err
}
}

newBaseWriteCmd := baseWriteCommand{
singleCommand: newSingleCommand(cluster, key, partition),
policy: policy,
}

return newBaseWriteCmd, nil
}

func (cmd *baseWriteCommand) writeBuffer(ifc command) Error {
panic(unreachable)
}

func (cmd *baseWriteCommand) getPolicy(ifc command) Policy {
return cmd.policy
}

func (cmd *baseWriteCommand) getNode(ifc command) (*Node, Error) {
return cmd.partition.GetNodeWrite(cmd.cluster)
}

func (cmd *baseWriteCommand) prepareRetry(ifc command, isTimeout bool) bool {
cmd.partition.PrepareRetryWrite(isTimeout)
return true
}

func (cmd *baseWriteCommand) isRead() bool {
return false
}

func (cmd *baseWriteCommand) parseResult(ifc command, conn *Connection) Error {
panic(unreachable)
}

func (cmd *baseWriteCommand) Execute() Error {
panic(unreachable)
}

func (cmd *baseWriteCommand) onInDoubt() {
if cmd.policy.Txn != nil {
cmd.policy.Txn.OnWriteInDoubt(cmd.key)
}

}

func (cmd *baseWriteCommand) commandType() commandType {
return ttPut
}

func (cmd *baseWriteCommand) parseHeader() (types.ResultCode, Error) {
rp, err := newRecordParser(&cmd.baseCommand)
if err != nil {
return err.resultCode(), err
}

if err := rp.parseFields(cmd.policy.Txn, cmd.key, true); err != nil {
return err.resultCode(), err
}

return rp.resultCode, nil
}
36 changes: 19 additions & 17 deletions batch_attr.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,18 @@ func newBatchAttr(policy *BatchPolicy, rattr int) *batchAttr {
return res
}

func newBatchAttrOps(rp *BatchPolicy, wp *BatchWritePolicy, ops []*Operation) {
func newBatchAttrOpsAttr(policy *BatchPolicy, rattr int, ops []*Operation) *batchAttr {
res := &batchAttr{}
res.setRead(policy)
res.readAttr = rattr
if len(ops) > 0 {
res.adjustRead(ops)
}
return res
}

// TODO: Check references
func newBatchAttrOps(rp *BatchPolicy, wp *BatchWritePolicy, ops []*Operation) *batchAttr {
res := &batchAttr{}
readAllBins := false
readHeader := false
Expand Down Expand Up @@ -79,6 +90,8 @@ func newBatchAttrOps(rp *BatchPolicy, wp *BatchWritePolicy, ops []*Operation) {
res.readAttr |= _INFO1_NOBINDATA
}
}

return res
}

func (ba *batchAttr) setRead(rp *BatchPolicy) {
Expand Down Expand Up @@ -136,27 +149,16 @@ func (ba *batchAttr) setBatchRead(rp *BatchReadPolicy) {
}

func (ba *batchAttr) adjustRead(ops []*Operation) {
readAllBins := false
readHeader := false

for _, op := range ops {
switch op.opType {
case _READ_HEADER:
readHeader = true
case _BIT_READ, _EXP_READ, _HLL_READ, _MAP_READ, _CDT_READ, _READ:
// Read all bins if no bin is specified.
if op.binName == "" {
readAllBins = true
case _READ:
if len(op.binName) == 0 {
ba.readAttr |= _INFO1_GET_ALL
}
default:
case _READ_HEADER:
ba.readAttr |= _INFO1_NOBINDATA
}
}

if readAllBins {
ba.readAttr |= _INFO1_GET_ALL
} else if readHeader {
ba.readAttr |= _INFO1_NOBINDATA
}
}

func (ba *batchAttr) adjustReadForAllBins(readAllBins bool) {
Expand Down
20 changes: 10 additions & 10 deletions batch_command_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func newBatchCommandDelete(
records: records,
attr: attr,
}
res.txn = policy.Txn
return res
}

Expand All @@ -80,6 +81,15 @@ func (cmd *batchCommandDelete) parseRecordResults(ifc command, receiveSize int)
return false, err
}
resultCode := types.ResultCode(cmd.dataBuffer[5] & 0xFF)
generation := Buffer.BytesToUint32(cmd.dataBuffer, 6)
expiration := types.TTL(Buffer.BytesToUint32(cmd.dataBuffer, 10))
batchIndex := int(Buffer.BytesToUint32(cmd.dataBuffer, 14))
fieldCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 18))
opCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 20))
err := cmd.parseFieldsWrite(resultCode, fieldCount, cmd.keys[batchIndex])
if err != nil {
return false, err
}

// The only valid server return codes are "ok" and "not found" and "filtered out".
// If other return codes are received, then abort the batch.
Expand All @@ -102,16 +112,6 @@ func (cmd *batchCommandDelete) parseRecordResults(ifc command, receiveSize int)
return false, nil
}

generation := Buffer.BytesToUint32(cmd.dataBuffer, 6)
expiration := types.TTL(Buffer.BytesToUint32(cmd.dataBuffer, 10))
batchIndex := int(Buffer.BytesToUint32(cmd.dataBuffer, 14))
fieldCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 18))
opCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 20))
err := cmd.parseFieldsWrite(resultCode, fieldCount, cmd.keys[batchIndex])
if err != nil {
return false, err
}

if resultCode == 0 {
if err = cmd.parseRecord(cmd.records[batchIndex], cmd.keys[batchIndex], opCount, generation, expiration); err != nil {
return false, err
Expand Down
Loading

0 comments on commit f810971

Please sign in to comment.