Skip to content

Commit

Permalink
[CLIENT-3159] Support writing raw payload to the server
Browse files Browse the repository at this point in the history
  • Loading branch information
khaf committed Dec 1, 2024
1 parent ab899d5 commit 9de9f9e
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 18 deletions.
13 changes: 13 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,19 @@ func (clnt *Client) GetNodeNames() []string {
// Write Record Operations
//-------------------------------------------------------

// PutPayload writes the raw write/delete payload to the server.
// The policy specifies the transaction timeout.
// If the policy is nil, the default relevant policy will be used.
func (clnt *Client) PutPayload(policy *WritePolicy, key *Key, payload []byte) Error {
policy = clnt.getUsableWritePolicy(policy)
command, err := newWritePayloadCommand(clnt.cluster, policy, key, payload)
if err != nil {
return err
}

return command.Execute()
}

// Put writes record bin(s) to the server.
// The policy specifies the transaction timeout, record expiration and how the transaction is
// handled when the record already exists.
Expand Down
1 change: 1 addition & 0 deletions client_ifc.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type ClientIfc interface {
Operate(policy *WritePolicy, key *Key, operations ...*Operation) (*Record, Error)
Prepend(policy *WritePolicy, key *Key, binMap BinMap) Error
PrependBins(policy *WritePolicy, key *Key, bins ...*Bin) Error
PutPayload(policy *WritePolicy, key *Key, payload []byte) Error
Put(policy *WritePolicy, key *Key, binMap BinMap) Error
PutBins(policy *WritePolicy, key *Key, bins ...*Bin) Error
Query(policy *QueryPolicy, statement *Statement) (*Recordset, Error)
Expand Down
62 changes: 62 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,68 @@ var _ = gg.Describe("Aerospike", func() {
gm.Expect(err).ToNot(gm.HaveOccurred())
})

gg.Context("PutPayload operations", func() {

gg.It("must put a record", func() {
key, err = as.NewKey(ns, set, 0)
gm.Expect(err).ToNot(gm.HaveOccurred())

binMap := as.BinMap{
"Aerospike": "value",
"Aerospike1": "value2",
}

wcmd, err := as.NewWriteCommand(nil, wpolicy, key, nil, binMap)
gm.Expect(err).ToNot(gm.HaveOccurred())

err = wcmd.WriteBuffer(&wcmd)
gm.Expect(err).ToNot(gm.HaveOccurred())
payload := wcmd.Buffer()

client.Delete(nil, key)
gm.Expect(err).ToNot(gm.HaveOccurred())

err = client.PutPayload(nil, key, payload)
gm.Expect(err).ToNot(gm.HaveOccurred())

rec, err := client.Get(nil, key)
gm.Expect(err).ToNot(gm.HaveOccurred())
gm.Expect(rec.Bins).To(gm.Equal(binMap))
})

gg.It("must delete a record", func() {
key, err = as.NewKey(ns, set, 0)
gm.Expect(err).ToNot(gm.HaveOccurred())

binMap := as.BinMap{
"Aerospike": "value",
"Aerospike1": "value2",
}

err := client.Put(nil, key, binMap)
gm.Expect(err).ToNot(gm.HaveOccurred())

exists, err := client.Exists(nil, key)
gm.Expect(err).ToNot(gm.HaveOccurred())
gm.Expect(exists).To(gm.BeTrue())

dcmd, err := as.NewDeleteCommand(nil, wpolicy, key)
gm.Expect(err).ToNot(gm.HaveOccurred())

err = dcmd.WriteBuffer(dcmd)
gm.Expect(err).ToNot(gm.HaveOccurred())
payload := dcmd.Buffer()

err = client.PutPayload(nil, key, payload)
gm.Expect(err).ToNot(gm.HaveOccurred())

exists, err = client.Exists(nil, key)
gm.Expect(err).ToNot(gm.HaveOccurred())
gm.Expect(exists).To(gm.BeFalse())
})

})

gg.Context("Put operations", func() {

gg.Context("Expiration values", func() {
Expand Down
39 changes: 21 additions & 18 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2702,27 +2702,30 @@ func (cmd *baseCommand) executeAt(ifc command, policy *BasePolicy, deadline time
return err
}

// Reset timeout in send buffer (destined for server) and socket.
binary.BigEndian.PutUint32(cmd.dataBuffer[22:], 0)
if !deadline.IsZero() {
serverTimeout := time.Until(deadline)
if serverTimeout < time.Millisecond {
serverTimeout = time.Millisecond
if _, rawPayload := ifc.(*writePayloadCommand); !rawPayload {
// Reset timeout in send buffer (destined for server) and socket.
binary.BigEndian.PutUint32(cmd.dataBuffer[22:], 0)
if !deadline.IsZero() {
serverTimeout := time.Until(deadline)
if serverTimeout < time.Millisecond {
serverTimeout = time.Millisecond
}
binary.BigEndian.PutUint32(cmd.dataBuffer[22:], uint32(serverTimeout/time.Millisecond))
}
binary.BigEndian.PutUint32(cmd.dataBuffer[22:], uint32(serverTimeout/time.Millisecond))
}

// now that the deadline has been set in the buffer, compress the contents
if err = cmd.compress(); err != nil {
applyTransactionErrorMetrics(cmd.node)
return chainErrors(err, errChain).iter(cmd.commandSentCounter).setNode(cmd.node).setInDoubt(ifc.isRead(), cmd.commandSentCounter)
}
// now that the deadline has been set in the buffer, compress the contents
if err = cmd.compress(); err != nil {
applyTransactionErrorMetrics(cmd.node)
return chainErrors(err, errChain).iter(cmd.commandSentCounter).setNode(cmd.node).setInDoubt(ifc.isRead(), cmd.commandSentCounter)
}

// now that the deadline has been set in the buffer, compress the contents
if err = cmd.prepareBuffer(ifc, deadline); err != nil {
applyTransactionErrorMetrics(cmd.node)
applyTransactionMetrics(cmd.node, ifc.transactionType(), transStart)
return chainErrors(err, errChain).iter(cmd.commandSentCounter).setNode(cmd.node)
// TODO: Redundant. Move everything to the prepareBuffer method and remove the branch on top of this block.
// // now that the deadline has been set in the buffer, compress the contents
// if err = cmd.prepareBuffer(ifc, deadline); err != nil {
// applyTransactionErrorMetrics(cmd.node)
// applyTransactionMetrics(cmd.node, ifc.transactionType(), transStart)
// return chainErrors(err, errChain).iter(cmd.commandSentCounter).setNode(cmd.node)
// }
}

// Send command.
Expand Down
35 changes: 35 additions & 0 deletions helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,38 @@ func (nd *Node) ConnsCount() int {
func (nd *Node) CloseConnections() {
nd.closeConnections()
}

func NewWriteCommand(
cluster *Cluster,
policy *WritePolicy,
key *Key,
bins []*Bin,
binMap BinMap) (writeCommand, Error) {
return newWriteCommand(
cluster,
policy,
key,
bins,
binMap,
_WRITE)
}

func (cmd *writeCommand) WriteBuffer(ifc command) Error {
return cmd.writeBuffer(ifc)
}

func (cmd *writeCommand) Buffer() []byte {
return cmd.dataBuffer[:cmd.dataOffset]
}

func NewDeleteCommand(cluster *Cluster, policy *WritePolicy, key *Key) (*deleteCommand, Error) {
return newDeleteCommand(cluster, policy, key)
}

func (cmd *deleteCommand) WriteBuffer(ifc command) Error {
return cmd.writeBuffer(ifc)
}

func (cmd *deleteCommand) Buffer() []byte {
return cmd.dataBuffer[:cmd.dataOffset]
}
120 changes: 120 additions & 0 deletions write_payload_command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2014-2022 Aerospike, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package aerospike

import (
"github.com/aerospike/aerospike-client-go/v7/types"

Buffer "github.com/aerospike/aerospike-client-go/v7/utils/buffer"
)

// guarantee writePayloadCommand implements command interface
var _ command = &writePayloadCommand{}

type writePayloadCommand struct {
singleCommand

policy *WritePolicy
payload []byte
}

func newWritePayloadCommand(
cluster *Cluster,
policy *WritePolicy,
key *Key,
payload []byte,
) (writePayloadCommand, Error) {

var partition *Partition
var err Error
if cluster != nil {
partition, err = PartitionForWrite(cluster, &policy.BasePolicy, key)
if err != nil {
return writePayloadCommand{}, err
}
}

newWriteCmd := writePayloadCommand{
singleCommand: newSingleCommand(cluster, key, partition),
policy: policy,
payload: payload,
}

return newWriteCmd, nil
}

func (cmd *writePayloadCommand) getPolicy(ifc command) Policy {
return cmd.policy
}

func (cmd *writePayloadCommand) writeBuffer(ifc command) Error {
cmd.dataBuffer = cmd.payload
cmd.dataOffset = len(cmd.payload)
return nil
}

func (cmd *writePayloadCommand) getNode(ifc command) (*Node, Error) {
return cmd.partition.GetNodeWrite(cmd.cluster)
}

func (cmd *writePayloadCommand) prepareRetry(ifc command, isTimeout bool) bool {
cmd.partition.PrepareRetryWrite(isTimeout)
return true
}

func (cmd *writePayloadCommand) parseResult(ifc command, conn *Connection) Error {
// make sure the payload is not put back in the buffer pool
defer func() {
cmd.dataBuffer = cmd.conn.origDataBuffer
cmd.dataOffset = 0
}()

// Read header.
if _, err := conn.Read(cmd.dataBuffer, int(_MSG_TOTAL_HEADER_SIZE)); err != nil {
return err
}

header := Buffer.BytesToInt64(cmd.dataBuffer, 0)

// Validate header to make sure we are at the beginning of a message
if err := cmd.validateHeader(header); err != nil {
return err
}

resultCode := cmd.dataBuffer[13] & 0xFF

if resultCode != 0 {
if resultCode == byte(types.KEY_NOT_FOUND_ERROR) {
return ErrKeyNotFound.err()
} else if types.ResultCode(resultCode) == types.FILTERED_OUT {
return ErrFilteredOut.err()
}

return newCustomNodeError(cmd.node, types.ResultCode(resultCode))
}
return cmd.emptySocket(conn)
}

func (cmd *writePayloadCommand) isRead() bool {
return false
}

func (cmd *writePayloadCommand) Execute() Error {
return cmd.execute(cmd)
}

func (cmd *writePayloadCommand) transactionType() transactionType {
return ttPut
}

0 comments on commit 9de9f9e

Please sign in to comment.