diff --git a/client.go b/client.go index 1903ded3..12f1d845 100644 --- a/client.go +++ b/client.go @@ -358,6 +358,22 @@ func (clnt *Client) Get(policy *BasePolicy, key *Key, binNames ...string) (*Reco return command.GetRecord(), nil } +// Like Get but does not deserialize bins containing a map +func (clnt *Client) GetLazy(policy *BasePolicy, key *Key, binNames ...string) (*Record, error) { + policy = clnt.getUsablePolicy(policy) + + command, err := newReadCommand(clnt.cluster, policy, key, binNames, nil) + if err != nil { + return nil, err + } + command.lazy = true + + if err := command.Execute(); err != nil { + return nil, err + } + return command.GetRecord(), nil +} + // GetHeader reads a record generation and expiration only for specified key. // Bins are not read. // The policy can be used to specify timeouts. diff --git a/command.go b/command.go index bb0d0966..5041a5eb 100644 --- a/command.go +++ b/command.go @@ -2117,6 +2117,10 @@ func (cmd *baseCommand) executeAt(ifc command, policy *BasePolicy, isRead bool, cmd.conn, err = ifc.getConnection(policy) if err != nil { + if policy.ExitFastOnExhaustedConnectionPool && err == types.ErrConnectionPoolEmptyAndAllConnectionsInUse { + break + } + isClientTimeout = true if err == types.ErrConnectionPoolEmpty { diff --git a/node.go b/node.go index 1fccfb25..92928b18 100644 --- a/node.go +++ b/node.go @@ -654,6 +654,10 @@ func (nd *Node) getConnectionWithHint(deadline time.Time, timeout time.Duration, } if conn == nil { + if nd.cluster.clientPolicy.LimitConnectionsToQueueSize && nd.connectionCount.Get() >= nd.cluster.clientPolicy.ConnectionQueueSize { + return nil, types.ErrConnectionPoolEmptyAndAllConnectionsInUse + } + // tentatively check if a connection is allowed to avoid launching too many goroutines. if err = nd.newConnectionAllowed(); err == nil { go nd.makeConnectionForPool(hint) diff --git a/policy.go b/policy.go index d866411e..fac73dff 100644 --- a/policy.go +++ b/policy.go @@ -148,6 +148,8 @@ type BasePolicy struct { // Scan and query are also not affected by replica algorithms. // Default to sending read commands to the node containing the key's master partition. ReplicaPolicy ReplicaPolicy + + ExitFastOnExhaustedConnectionPool bool } // NewPolicy generates a new BasePolicy instance with default values. diff --git a/read_command.go b/read_command.go index ca1cad12..f1038c45 100644 --- a/read_command.go +++ b/read_command.go @@ -20,6 +20,7 @@ import ( "github.com/aerospike/aerospike-client-go/v4/logger" "github.com/aerospike/aerospike-client-go/v4/types" + ParticleType "github.com/aerospike/aerospike-client-go/v4/types/particle_type" Buffer "github.com/aerospike/aerospike-client-go/v4/utils/buffer" ) @@ -35,6 +36,7 @@ type readCommand struct { object *reflect.Value replicaSequence int + lazy bool } // this method uses reflection. @@ -221,7 +223,15 @@ func (cmd *readCommand) parseRecord( receiveOffset += 4 + 4 + nameSize particleBytesSize := opSize - (4 + nameSize) - value, _ := bytesToParticle(particleType, cmd.dataBuffer, receiveOffset, particleBytesSize) + var value interface{} + if cmd.lazy && particleType == ParticleType.MAP { + // we need to make a copy as the buffer is reused + b := make([]byte, particleBytesSize) + copy(b, cmd.dataBuffer[receiveOffset:receiveOffset+particleBytesSize]) + value = newUnpacker(b, 0, particleBytesSize) + } else { + value, _ = bytesToParticle(particleType, cmd.dataBuffer, receiveOffset, particleBytesSize) + } receiveOffset += particleBytesSize if bins == nil { diff --git a/types/error.go b/types/error.go index 265dc457..ae0d8a07 100644 --- a/types/error.go +++ b/types/error.go @@ -58,10 +58,11 @@ func (ase *AerospikeError) MarkInDoubt() { // If no message is provided, the result code will be translated into the default // error message automatically. // To be able to check for error type, you could use the following: -// if aerr, ok := err.(AerospikeError); ok { -// errCode := aerr.ResultCode() -// errMessage := aerr.Error() -// } +// +// if aerr, ok := err.(AerospikeError); ok { +// errCode := aerr.ResultCode() +// errMessage := aerr.Error() +// } func NewAerospikeError(code ResultCode, messages ...string) error { if len(messages) == 0 { messages = []string{ResultCodeToString(code)} @@ -74,20 +75,21 @@ func NewAerospikeError(code ResultCode, messages ...string) error { //revive:disable var ( - ErrServerNotAvailable = NewAerospikeError(SERVER_NOT_AVAILABLE) - ErrKeyNotFound = NewAerospikeError(KEY_NOT_FOUND_ERROR) - ErrRecordsetClosed = NewAerospikeError(RECORDSET_CLOSED) - ErrConnectionPoolEmpty = NewAerospikeError(NO_AVAILABLE_CONNECTIONS_TO_NODE, "Connection pool is empty. This happens when either all connection are in-use already, or no connections were available") - ErrTooManyConnectionsForNode = NewAerospikeError(NO_AVAILABLE_CONNECTIONS_TO_NODE, "Connection limit reached for this node. This value is controlled via ClientPolicy.LimitConnectionsToQueueSize") - ErrTooManyOpeningConnections = NewAerospikeError(NO_AVAILABLE_CONNECTIONS_TO_NODE, "Too many connections are trying to open at once. This value is controlled via ClientPolicy.OpeningConnectionThreshold") - ErrTimeout = NewAerospikeError(TIMEOUT, "command execution timed out on client: See `Policy.Timeout`") - ErrUDFBadResponse = NewAerospikeError(UDF_BAD_RESPONSE, "Invalid UDF return value") - ErrNoOperationsSpecified = NewAerospikeError(INVALID_COMMAND, "No operations were passed to QueryExecute") - ErrNoBinNamesAlloedInQueryExecute = NewAerospikeError(INVALID_COMMAND, "Statement.BinNames must be empty for QueryExecute") - ErrFilteredOut = NewAerospikeError(FILTERED_OUT) - ErrPartitionScanQueryNotSupported = NewAerospikeError(PARAMETER_ERROR, "Partition Scans/Queries are not supported by all nodes in this cluster") - ErrScanTerminated = NewAerospikeError(SCAN_TERMINATED) - ErrQueryTerminated = NewAerospikeError(QUERY_TERMINATED) + ErrServerNotAvailable = NewAerospikeError(SERVER_NOT_AVAILABLE) + ErrKeyNotFound = NewAerospikeError(KEY_NOT_FOUND_ERROR) + ErrRecordsetClosed = NewAerospikeError(RECORDSET_CLOSED) + ErrConnectionPoolEmpty = NewAerospikeError(NO_AVAILABLE_CONNECTIONS_TO_NODE, "Connection pool is empty. This happens when either all connection are in-use already, or no connections were available") + ErrConnectionPoolEmptyAndAllConnectionsInUse = NewAerospikeError(NO_AVAILABLE_CONNECTIONS_TO_NODE, "Connection pool is empty and all connections are in use. This happens when all connection are in-use already") + ErrTooManyConnectionsForNode = NewAerospikeError(NO_AVAILABLE_CONNECTIONS_TO_NODE, "Connection limit reached for this node. This value is controlled via ClientPolicy.LimitConnectionsToQueueSize") + ErrTooManyOpeningConnections = NewAerospikeError(NO_AVAILABLE_CONNECTIONS_TO_NODE, "Too many connections are trying to open at once. This value is controlled via ClientPolicy.OpeningConnectionThreshold") + ErrTimeout = NewAerospikeError(TIMEOUT, "command execution timed out on client: See `Policy.Timeout`") + ErrUDFBadResponse = NewAerospikeError(UDF_BAD_RESPONSE, "Invalid UDF return value") + ErrNoOperationsSpecified = NewAerospikeError(INVALID_COMMAND, "No operations were passed to QueryExecute") + ErrNoBinNamesAlloedInQueryExecute = NewAerospikeError(INVALID_COMMAND, "Statement.BinNames must be empty for QueryExecute") + ErrFilteredOut = NewAerospikeError(FILTERED_OUT) + ErrPartitionScanQueryNotSupported = NewAerospikeError(PARAMETER_ERROR, "Partition Scans/Queries are not supported by all nodes in this cluster") + ErrScanTerminated = NewAerospikeError(SCAN_TERMINATED) + ErrQueryTerminated = NewAerospikeError(QUERY_TERMINATED) ) //revive:enable