Skip to content

Commit

Permalink
[CLIENT-2726] Proxy doesn't handle invalid filter expression error in…
Browse files Browse the repository at this point in the history
… query
  • Loading branch information
khaf committed Jan 18, 2024
1 parent a2181be commit cedab19
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 36 deletions.
7 changes: 1 addition & 6 deletions batch_command_operate.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ type batchCommandOperate struct {
// pointer to the object that's going to be unmarshalled
objects []*reflect.Value
objectsFound []bool

grpcEOS bool
}

func newBatchCommandOperate(
Expand Down Expand Up @@ -284,10 +282,7 @@ func (cmd *batchCommandOperate) ExecuteGRPC(clnt *ProxyClient) Error {
return res.Payload, e
}

if !res.HasNext {
cmd.grpcEOS = true
return res.Payload, nil
}
cmd.grpcEOS = !res.HasNext

return res.Payload, nil
}
Expand Down
4 changes: 2 additions & 2 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,8 @@ var _ = gg.Describe("Aerospike", func() {
})

gg.It("Overall command error should be reflected in API call error and not BatchRecord error", func() {
if *dbaas {
gg.Skip("Not supported in DBAAS environment")
if *dbaas || *proxy {
gg.Skip("Not supported in DBAAS or PROXY environments")
}

var batchRecords []as.BatchRecordIfc
Expand Down
7 changes: 4 additions & 3 deletions multi_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ type baseMultiCommand struct {
resObjMappings map[string][]int
selectCases []reflect.SelectCase

bc bufferedConn
bc bufferedConn
grpcEOS bool
}

var multiObjectParser func(
Expand Down Expand Up @@ -354,7 +355,7 @@ func (cmd *baseMultiCommand) parseRecordResults(ifc command, receiveSize int) (b
}
}

if !cmd.tracker.allowRecord(cmd.nodePartitions) {
if cmd.grpcEOS || !cmd.tracker.allowRecord(cmd.nodePartitions) {
continue
}

Expand All @@ -380,7 +381,7 @@ func (cmd *baseMultiCommand) parseRecordResults(ifc command, receiveSize int) (b
return false, err
}

if !cmd.tracker.allowRecord(cmd.nodePartitions) {
if cmd.grpcEOS || !cmd.tracker.allowRecord(cmd.nodePartitions) {
continue
}

Expand Down
26 changes: 14 additions & 12 deletions proxy_query_partition_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ func (cmd *grpcQueryPartitionCommand) ExecuteGRPC(clnt *ProxyClient) Error {
cmd.commandWasSent = true

readCallback := func() ([]byte, Error) {
if cmd.grpcEOS {
return nil, errGRPCStreamEnd
}

res, gerr := streamRes.Recv()
if gerr != nil {
e := newGrpcError(gerr)
Expand All @@ -124,18 +128,7 @@ func (cmd *grpcQueryPartitionCommand) ExecuteGRPC(clnt *ProxyClient) Error {
return res.Payload, e
}

if !res.HasNext {
done, err := cmd.tracker.isComplete(false, &cmd.policy.BasePolicy, []*nodePartitions{cmd.nodePartitions})
if !cmd.recordset.IsActive() || done || err != nil {
// Query is complete.
if err != nil {
cmd.tracker.partitionError()
cmd.recordset.sendError(err)
}
}

return nil, errGRPCStreamEnd
}
cmd.grpcEOS = !res.HasNext

return res.Payload, nil
}
Expand All @@ -147,6 +140,15 @@ func (cmd *grpcQueryPartitionCommand) ExecuteGRPC(clnt *ProxyClient) Error {
return err
}

done, err := cmd.tracker.isComplete(false, &cmd.policy.BasePolicy, []*nodePartitions{cmd.nodePartitions})
if !cmd.recordset.IsActive() || done || err != nil {
// Query is complete.
if err != nil {
cmd.tracker.partitionError()
cmd.recordset.sendError(err)
}
}

clnt.returnGrpcConnToPool(conn)

return nil
Expand Down
28 changes: 15 additions & 13 deletions proxy_scan_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,32 +114,25 @@ func (cmd *grpcScanPartitionCommand) ExecuteGRPC(clnt *ProxyClient) Error {
cmd.commandWasSent = true

readCallback := func() ([]byte, Error) {
if cmd.grpcEOS {
return nil, errGRPCStreamEnd
}

res, gerr := streamRes.Recv()
if gerr != nil {
e := newGrpcError(gerr)
cmd.recordset.sendError(e)
return nil, e
}

cmd.grpcEOS = !res.HasNext

if res.Status != 0 {
e := newGrpcStatusError(res)
cmd.recordset.sendError(e)
return res.Payload, e
}

if !res.HasNext {
done, err := cmd.tracker.isComplete(false, &cmd.policy.BasePolicy, []*nodePartitions{cmd.nodePartitions})
if !cmd.recordset.IsActive() || done || err != nil {
// Query is complete.
if err != nil {
cmd.tracker.partitionError()
cmd.recordset.sendError(err)
}
}

return nil, errGRPCStreamEnd
}

return res.Payload, nil
}

Expand All @@ -150,6 +143,15 @@ func (cmd *grpcScanPartitionCommand) ExecuteGRPC(clnt *ProxyClient) Error {
return err
}

done, err := cmd.tracker.isComplete(false, &cmd.policy.BasePolicy, []*nodePartitions{cmd.nodePartitions})
if !cmd.recordset.IsActive() || done || err != nil {
// Query is complete.
if err != nil {
cmd.tracker.partitionError()
cmd.recordset.sendError(err)
}
}

clnt.returnGrpcConnToPool(conn)

return nil
Expand Down

0 comments on commit cedab19

Please sign in to comment.