Skip to content

Commit

Permalink
[CLIENT-3112] Correctly handle new error messages/error codes returne…
Browse files Browse the repository at this point in the history
…d by AS 7.2
  • Loading branch information
khaf committed Sep 13, 2024
1 parent 4ed558d commit d7fdaad
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 43 deletions.
79 changes: 42 additions & 37 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,23 +861,31 @@ func (clnt *Client) RegisterUDF(policy *WritePolicy, udfBody []byte, serverPath
}

response := responseMap[strCmd.String()]
if strings.EqualFold(response, "ok") {
return NewRegisterTask(clnt.cluster, serverPath), nil
}

err = parseInfoErrorCode(response)

res := make(map[string]string)
vals := strings.Split(response, ";")
vals := strings.Split("error="+err.Error(), ";")
for _, pair := range vals {
t := strings.SplitN(pair, "=", 2)
if len(t) == 2 {
res[t[0]] = t[1]
res[strings.ToLower(t[0])] = t[1]
} else if len(t) == 1 {
res[t[0]] = ""
res[strings.ToLower(t[0])] = ""
}
}

if _, exists := res["error"]; exists {
msg, _ := base64.StdEncoding.DecodeString(res["message"])
return nil, newError(types.COMMAND_REJECTED, fmt.Sprintf("Registration failed: %s\nFile: %s\nLine: %s\nMessage: %s",
return nil, newError(err.resultCode(), fmt.Sprintf("Registration failed: %s\nFile: %s\nLine: %s\nMessage: %s",
res["error"], res["file"], res["line"], msg))
}
return NewRegisterTask(clnt.cluster, serverPath), nil

// if message was not parsable
return nil, parseInfoErrorCode(response)
}

// RemoveUDF removes a package containing user defined functions in the server.
Expand All @@ -903,10 +911,10 @@ func (clnt *Client) RemoveUDF(policy *WritePolicy, udfName string) (*RemoveTask,
}

response := responseMap[strCmd.String()]
if response == "ok" {
if strings.EqualFold(response, "ok") {
return NewRemoveTask(clnt.cluster, udfName), nil
}
return nil, newError(types.SERVER_ERROR, response)
return nil, parseInfoErrorCode(response)
}

// ListUDF lists all packages containing user defined functions in the server.
Expand Down Expand Up @@ -1134,36 +1142,34 @@ func (clnt *Client) SetXDRFilter(policy *InfoPolicy, datacenter string, namespac
return nil
}

return parseIndexErrorCode(response)
return parseInfoErrorCode(response)
}

var indexErrRegexp = regexp.MustCompile(`(?i)(fail|error)(:[0-9]+)?(:.+)?`)
var infoErrRegexp = regexp.MustCompile(`(?i)(fail|error)((:|=)(?P<code>[0-9]+))?((:|=)(?P<msg>.+))?`)

func parseInfoErrorCode(response string) Error {
match := infoErrRegexp.FindStringSubmatch(response)

func parseIndexErrorCode(response string) Error {
var code = types.SERVER_ERROR
var message = response

match := indexErrRegexp.FindStringSubmatch(response)

// invalid response
if len(match) != 4 {
return newError(types.PARSE_ERROR, response)
}

// error code
if len(match[2]) > 0 {
i, err := strconv.ParseInt(string(match[2][1:]), 10, 64)
if err == nil {
code = types.ResultCode(i)
message = types.ResultCodeToString(code)
if len(match) > 0 {
for i, name := range infoErrRegexp.SubexpNames() {
if i != 0 && name != "" && len(match[i]) > 0 {
switch name {
case "code":
i, err := strconv.ParseInt(match[i], 10, 64)
if err == nil {
code = types.ResultCode(i)
message = types.ResultCodeToString(code)
}
case "msg":
message = match[i]
}
}
}
}

// message
if len(match[3]) > 0 {
message = string(match[3][1:])
}

return newError(code, message)
}

Expand Down Expand Up @@ -1325,7 +1331,7 @@ func (clnt *Client) CreateComplexIndex(
return NewIndexTask(clnt.cluster, namespace, indexName), nil
}

return nil, parseIndexErrorCode(response)
return nil, parseInfoErrorCode(response)
}

// DropIndex deletes a secondary index. It will block until index is dropped on all nodes.
Expand Down Expand Up @@ -1363,7 +1369,7 @@ func (clnt *Client) DropIndex(
return <-task.OnComplete()
}

err = parseIndexErrorCode(response)
err = parseInfoErrorCode(response)
if err.Matches(types.INDEX_NOTFOUND) {
// Index did not previously exist. Return without error.
return nil
Expand All @@ -1381,11 +1387,6 @@ func (clnt *Client) DropIndex(
func (clnt *Client) Truncate(policy *InfoPolicy, namespace, set string, beforeLastUpdate *time.Time) Error {
policy = clnt.getUsableInfoPolicy(policy)

node, err := clnt.cluster.GetRandomNode()
if err != nil {
return err
}

var strCmd bytes.Buffer
if len(set) > 0 {
strCmd.WriteString("truncate:namespace=")
Expand All @@ -1401,13 +1402,17 @@ func (clnt *Client) Truncate(policy *InfoPolicy, namespace, set string, beforeLa
strCmd.WriteString(strconv.FormatInt(beforeLastUpdate.UnixNano(), 10))
}

responseMap, err := node.RequestInfo(policy, strCmd.String())
responseMap, err := clnt.sendInfoCommand(policy.Timeout, strCmd.String())
if err != nil {
return err
}

response := responseMap[strCmd.String()]
if strings.EqualFold(response, "OK") {
return nil
}

return newError(types.SERVER_ERROR, "Truncate failed: "+response)
return parseInfoErrorCode(response)
}

//-------------------------------------------------------
Expand Down
10 changes: 6 additions & 4 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,17 @@ var _ = gg.Describe("Aerospike", func() {

var actualClusterName string

gg.Describe("Client IndexErrorParser", func() {
gg.Describe("Client InfoErrorParser", func() {

gg.It("must parse IndexError response strings", func() {
gg.It("must parse InfoError response strings", func() {
type t struct {
r string
code types.ResultCode
err string
}

responses := []t{
{"invalid", types.PARSE_ERROR, "invalid"},
{"invalid", types.SERVER_ERROR, "invalid"},
{"FAIL", types.SERVER_ERROR, "FAIL"},
{"FAiL", types.SERVER_ERROR, "FAiL"},
{"Error", types.SERVER_ERROR, "Error"},
Expand All @@ -83,10 +83,12 @@ var _ = gg.Describe("Aerospike", func() {
{"ERROR:200", types.INDEX_FOUND, "Index already exists"},
{"FAIL:201", types.INDEX_NOTFOUND, "Index not found"},
{"FAIL:201:some message from the server", types.INDEX_NOTFOUND, "some message from the server"},
{"FAIL:some message from the server", types.SERVER_ERROR, "some message from the server"},
{"error:some message from the server", types.SERVER_ERROR, "some message from the server"},
}

for _, r := range responses {
err := as.ParseIndexErrorCode(r.r)
err := as.ParseInfoErrorCode(r.r)
gm.Expect(err).To(gm.HaveOccurred())
gm.Expect(err.(*as.AerospikeError).Msg()).To(gm.Equal(r.err))
gm.Expect(err.Matches(r.code)).To(gm.BeTrue())
Expand Down
4 changes: 2 additions & 2 deletions helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

package aerospike

func ParseIndexErrorCode(response string) Error {
return parseIndexErrorCode(response)
func ParseInfoErrorCode(response string) Error {
return parseInfoErrorCode(response)
}

func (e *AerospikeError) Msg() string {
Expand Down
14 changes: 14 additions & 0 deletions udf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ function getRecordKeyValue(rec)
end
`

const invalidUdfBody = `function testFunc1(rec, div)
asdf
returned ret -- Return the Return value and/or status
end`

// ALL tests are isolated by SetName and Key, which are 50 random characters
var _ = gg.Describe("UDF/Query tests", func() {

Expand Down Expand Up @@ -96,6 +101,15 @@ var _ = gg.Describe("UDF/Query tests", func() {
gm.Expect(<-regTask.OnComplete()).NotTo(gm.HaveOccurred())
})

gg.It("must parse invalid UDF error", func() {
_, err := client.RegisterUDF(wpolicy, []byte(invalidUdfBody), "invalid_udf1.lua", as.LUA)
gm.Expect(err).To(gm.HaveOccurred())
gm.Expect(err.Error()).To(gm.HaveSuffix(`compile_error
File: invalid_udf1.lua
Line: 3
Message: syntax error near 'returned'`))
})

gg.It("must run a UDF on a single record", func() {
registerUDF(udfBody, "udf1.lua")

Expand Down

0 comments on commit d7fdaad

Please sign in to comment.