Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,22 @@ func (clnt *Client) Get(policy *BasePolicy, key *Key, binNames ...string) (*Reco
return command.GetRecord(), nil
}

// Like Get but does not deserialize bins containing a map
func (clnt *Client) GetLazy(policy *BasePolicy, key *Key, binNames ...string) (*Record, error) {
policy = clnt.getUsablePolicy(policy)

command, err := newReadCommand(clnt.cluster, policy, key, binNames, nil)
if err != nil {
return nil, err
}
command.lazy = true

if err := command.Execute(); err != nil {
return nil, err
}
return command.GetRecord(), nil
}

// GetHeader reads a record generation and expiration only for specified key.
// Bins are not read.
// The policy can be used to specify timeouts.
Expand Down
4 changes: 4 additions & 0 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2117,6 +2117,10 @@ func (cmd *baseCommand) executeAt(ifc command, policy *BasePolicy, isRead bool,

cmd.conn, err = ifc.getConnection(policy)
if err != nil {
if policy.ExitFastOnExhaustedConnectionPool && err == types.ErrConnectionPoolEmptyAndAllConnectionsInUse {
break
}

isClientTimeout = true

if err == types.ErrConnectionPoolEmpty {
Expand Down
4 changes: 4 additions & 0 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,10 @@ func (nd *Node) getConnectionWithHint(deadline time.Time, timeout time.Duration,
}

if conn == nil {
if nd.cluster.clientPolicy.LimitConnectionsToQueueSize && nd.connectionCount.Get() >= nd.cluster.clientPolicy.ConnectionQueueSize {
return nil, types.ErrConnectionPoolEmptyAndAllConnectionsInUse
}

// tentatively check if a connection is allowed to avoid launching too many goroutines.
if err = nd.newConnectionAllowed(); err == nil {
go nd.makeConnectionForPool(hint)
Expand Down
2 changes: 2 additions & 0 deletions policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ type BasePolicy struct {
// Scan and query are also not affected by replica algorithms.
// Default to sending read commands to the node containing the key's master partition.
ReplicaPolicy ReplicaPolicy

ExitFastOnExhaustedConnectionPool bool
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it make sense to use ExitFastOnExhaustedConnectionPool naming here (same as in aerospike verision).

By doing so - when we will upgrade to even newer version - we will just remove this commit

}

// NewPolicy generates a new BasePolicy instance with default values.
Expand Down
12 changes: 11 additions & 1 deletion read_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/aerospike/aerospike-client-go/v4/logger"
"github.com/aerospike/aerospike-client-go/v4/types"
ParticleType "github.com/aerospike/aerospike-client-go/v4/types/particle_type"

Buffer "github.com/aerospike/aerospike-client-go/v4/utils/buffer"
)
Expand All @@ -35,6 +36,7 @@ type readCommand struct {
object *reflect.Value

replicaSequence int
lazy bool
}

// this method uses reflection.
Expand Down Expand Up @@ -221,7 +223,15 @@ func (cmd *readCommand) parseRecord(
receiveOffset += 4 + 4 + nameSize

particleBytesSize := opSize - (4 + nameSize)
value, _ := bytesToParticle(particleType, cmd.dataBuffer, receiveOffset, particleBytesSize)
var value interface{}
if cmd.lazy && particleType == ParticleType.MAP {
// we need to make a copy as the buffer is reused
b := make([]byte, particleBytesSize)
copy(b, cmd.dataBuffer[receiveOffset:receiveOffset+particleBytesSize])
value = newUnpacker(b, 0, particleBytesSize)
} else {
value, _ = bytesToParticle(particleType, cmd.dataBuffer, receiveOffset, particleBytesSize)
}
receiveOffset += particleBytesSize

if bins == nil {
Expand Down
38 changes: 20 additions & 18 deletions types/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ func (ase *AerospikeError) MarkInDoubt() {
// If no message is provided, the result code will be translated into the default
// error message automatically.
// To be able to check for error type, you could use the following:
// if aerr, ok := err.(AerospikeError); ok {
// errCode := aerr.ResultCode()
// errMessage := aerr.Error()
// }
//
// if aerr, ok := err.(AerospikeError); ok {
// errCode := aerr.ResultCode()
// errMessage := aerr.Error()
// }
func NewAerospikeError(code ResultCode, messages ...string) error {
if len(messages) == 0 {
messages = []string{ResultCodeToString(code)}
Expand All @@ -74,20 +75,21 @@ func NewAerospikeError(code ResultCode, messages ...string) error {
//revive:disable

var (
ErrServerNotAvailable = NewAerospikeError(SERVER_NOT_AVAILABLE)
ErrKeyNotFound = NewAerospikeError(KEY_NOT_FOUND_ERROR)
ErrRecordsetClosed = NewAerospikeError(RECORDSET_CLOSED)
ErrConnectionPoolEmpty = NewAerospikeError(NO_AVAILABLE_CONNECTIONS_TO_NODE, "Connection pool is empty. This happens when either all connection are in-use already, or no connections were available")
ErrTooManyConnectionsForNode = NewAerospikeError(NO_AVAILABLE_CONNECTIONS_TO_NODE, "Connection limit reached for this node. This value is controlled via ClientPolicy.LimitConnectionsToQueueSize")
ErrTooManyOpeningConnections = NewAerospikeError(NO_AVAILABLE_CONNECTIONS_TO_NODE, "Too many connections are trying to open at once. This value is controlled via ClientPolicy.OpeningConnectionThreshold")
ErrTimeout = NewAerospikeError(TIMEOUT, "command execution timed out on client: See `Policy.Timeout`")
ErrUDFBadResponse = NewAerospikeError(UDF_BAD_RESPONSE, "Invalid UDF return value")
ErrNoOperationsSpecified = NewAerospikeError(INVALID_COMMAND, "No operations were passed to QueryExecute")
ErrNoBinNamesAlloedInQueryExecute = NewAerospikeError(INVALID_COMMAND, "Statement.BinNames must be empty for QueryExecute")
ErrFilteredOut = NewAerospikeError(FILTERED_OUT)
ErrPartitionScanQueryNotSupported = NewAerospikeError(PARAMETER_ERROR, "Partition Scans/Queries are not supported by all nodes in this cluster")
ErrScanTerminated = NewAerospikeError(SCAN_TERMINATED)
ErrQueryTerminated = NewAerospikeError(QUERY_TERMINATED)
ErrServerNotAvailable = NewAerospikeError(SERVER_NOT_AVAILABLE)
ErrKeyNotFound = NewAerospikeError(KEY_NOT_FOUND_ERROR)
ErrRecordsetClosed = NewAerospikeError(RECORDSET_CLOSED)
ErrConnectionPoolEmpty = NewAerospikeError(NO_AVAILABLE_CONNECTIONS_TO_NODE, "Connection pool is empty. This happens when either all connection are in-use already, or no connections were available")
ErrConnectionPoolEmptyAndAllConnectionsInUse = NewAerospikeError(NO_AVAILABLE_CONNECTIONS_TO_NODE, "Connection pool is empty and all connections are in use. This happens when all connection are in-use already")
ErrTooManyConnectionsForNode = NewAerospikeError(NO_AVAILABLE_CONNECTIONS_TO_NODE, "Connection limit reached for this node. This value is controlled via ClientPolicy.LimitConnectionsToQueueSize")
ErrTooManyOpeningConnections = NewAerospikeError(NO_AVAILABLE_CONNECTIONS_TO_NODE, "Too many connections are trying to open at once. This value is controlled via ClientPolicy.OpeningConnectionThreshold")
ErrTimeout = NewAerospikeError(TIMEOUT, "command execution timed out on client: See `Policy.Timeout`")
ErrUDFBadResponse = NewAerospikeError(UDF_BAD_RESPONSE, "Invalid UDF return value")
ErrNoOperationsSpecified = NewAerospikeError(INVALID_COMMAND, "No operations were passed to QueryExecute")
ErrNoBinNamesAlloedInQueryExecute = NewAerospikeError(INVALID_COMMAND, "Statement.BinNames must be empty for QueryExecute")
ErrFilteredOut = NewAerospikeError(FILTERED_OUT)
ErrPartitionScanQueryNotSupported = NewAerospikeError(PARAMETER_ERROR, "Partition Scans/Queries are not supported by all nodes in this cluster")
ErrScanTerminated = NewAerospikeError(SCAN_TERMINATED)
ErrQueryTerminated = NewAerospikeError(QUERY_TERMINATED)
)

//revive:enable
Loading