Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify ClientError to trap an inner error if necessary. #19

Merged
merged 1 commit into from
Aug 27, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions client.go
Original file line number Diff line number Diff line change
@@ -6,14 +6,11 @@ import (
"strings"
)

const (
fmtErrClientInvalidRemoteAddress = "[Client] invalid RemoteAddress '%s'"
fmtErrClientInvalidRemoteAddressWithErr = "[Client] invalid RemoteAddress '%s', err '%v'"
)
const ErrClientInvalidRemoteAddress = "[Client] invalid RemoteAddress '%s'"

var (
ErrClientOptionsRequired = newClientError("[Client] options are required")
ErrClientMissingRequiredData = newClientError("[Client] options must specify either a Cluster or a set of RemoteAddresses")
ErrClientOptionsRequired = newClientError("[Client] options are required", nil)
ErrClientMissingRequiredData = newClientError("[Client] options must specify either a Cluster or a set of RemoteAddresses", nil)
)

// Client object contains your cluster object
@@ -83,7 +80,7 @@ func newClientUsingAddresses(port uint16, remoteAddresses []string) (*Client, er
s := strings.SplitN(ra, ":", 2)
switch len(s) {
case 0:
return nil, newClientError(fmt.Sprintf(fmtErrClientInvalidRemoteAddress, ra))
return nil, newClientError(fmt.Sprintf(ErrClientInvalidRemoteAddress, ra), nil)
case 1:
if port > 0 {
nopts.RemoteAddress = fmt.Sprintf("%s:%d", s[0], port)
@@ -92,12 +89,12 @@ func newClientUsingAddresses(port uint16, remoteAddresses []string) (*Client, er
}
case 2:
if p, err := strconv.Atoi(s[1]); err != nil {
return nil, newClientError(fmt.Sprintf(fmtErrClientInvalidRemoteAddressWithErr, ra, err))
return nil, newClientError(ErrClientInvalidRemoteAddress, err)
} else {
nopts.RemoteAddress = fmt.Sprintf("%s:%d", s[0], p)
}
default:
return nil, newClientError(fmt.Sprintf(fmtErrClientInvalidRemoteAddress, ra))
return nil, newClientError(fmt.Sprintf(ErrClientInvalidRemoteAddress, ra), nil)
}
if node, err := NewNode(nopts); err != nil {
return nil, err
32 changes: 17 additions & 15 deletions cluster.go
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@ package riak

import (
"fmt"
"sync"
"time"
)

@@ -40,14 +39,15 @@ type Cluster struct {

// Cluster errors
var (
ErrClusterCommandRequired = newClientError("[Cluster] Command must be non-nil")
ErrClusterAsyncRequiresChannelOrWaitGroup = newClientError("[Cluster] ExecuteAsync argument requires a channel or sync.WaitGroup to indicate completion")
ErrClusterNoNodesAvailable = newClientError("[Cluster] all retries exhausted and/or no nodes available to execute command")
ErrClusterEnqueueWhileShuttingDown = newClientError("[Cluster] will not enqueue command, shutting down")
ErrClusterShuttingDown = newClientError("[Cluster] will not execute command, shutting down")
ErrClusterNodeMustBeNonNil = newClientError("[Cluster] node argument must be non-nil")
ErrClusterCommandRequired = newClientError("[Cluster] Command must be non-nil", nil)
ErrClusterAsyncRequiresChannelOrWaitGroup = newClientError("[Cluster] ExecuteAsync argument requires a channel or sync.WaitGroup to indicate completion", nil)
ErrClusterEnqueueWhileShuttingDown = newClientError("[Cluster] will not enqueue command, shutting down", nil)
ErrClusterShuttingDown = newClientError("[Cluster] will not execute command, shutting down", nil)
ErrClusterNodeMustBeNonNil = newClientError("[Cluster] node argument must be non-nil", nil)
)

const ErrClusterNoNodesAvailable = "[Cluster] all retries exhausted and/or no nodes available to execute command"

var defaultClusterOptions = &ClusterOptions{
Nodes: make([]*Node, 0),
NodeManager: &defaultNodeManager{},
@@ -227,7 +227,7 @@ func (c *Cluster) RemoveNode(n *Node) error {
}

// Execute (asynchronously) the provided Command against the active pooled Nodes using the NodeManager
func (c *Cluster) ExecuteAsync(async *Async) (err error) {
func (c *Cluster) ExecuteAsync(async *Async) error {
if async.Command == nil {
return ErrClusterCommandRequired
}
@@ -242,18 +242,20 @@ func (c *Cluster) ExecuteAsync(async *Async) (err error) {
}

// Execute (synchronously) the provided Command against the active pooled Nodes using the NodeManager
func (c *Cluster) Execute(command Command) (err error) {
func (c *Cluster) Execute(command Command) error {
if command == nil {
return ErrClusterCommandRequired
}
wg := &sync.WaitGroup{}
wg.Add(1)
async := &Async{
Command: command,
Wait: wg,
}
go c.execute(async)
wg.Wait()
c.execute(async)
if async.Error != nil {
return async.Error
}
if cerr := command.Error(); cerr != nil {
return cerr
}
return nil
}

@@ -309,7 +311,7 @@ func (c *Cluster) execute(async *Async) {
if command.hasRemainingTries() {
async.onRetry()
} else {
err = ErrClusterNoNodesAvailable
err = newClientError(ErrClusterNoNodesAvailable, err)
}
}
if !enqueued {
1 change: 1 addition & 0 deletions command.go
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ type StreamingCommand interface {
type Command interface {
Name() string
Success() bool
Error() error
getRequestCode() byte
constructPbRequest() (proto.Message, error)
onError(error)
8 changes: 6 additions & 2 deletions command_impl.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package riak

type CommandImpl struct {
Error error
error error
success bool
remainingTries byte
lastNode *Node
@@ -11,11 +11,15 @@ func (cmd *CommandImpl) Success() bool {
return cmd.success == true
}

func (cmd *CommandImpl) Error() error {
return cmd.error
}

func (cmd *CommandImpl) onError(err error) {
cmd.success = false
// NB: only set error to the *last* error (retries)
if !cmd.hasRemainingTries() {
cmd.Error = err
cmd.error = err
}
}

6 changes: 3 additions & 3 deletions connection.go
Original file line number Diff line number Diff line change
@@ -244,12 +244,12 @@ func (c *connection) read() ([]byte, error) {
count, err = io.ReadFull(c.conn, c.dataBuf)
} else {
if err == nil && count != 4 {
err = newClientError(fmt.Sprintf("[Connection] expected to read 4 bytes, only read: %d", count))
err = newClientError(fmt.Sprintf("[Connection] expected to read 4 bytes, only read: %d", count), nil)
}
}

if err == nil && count != int(messageLength) {
err = newClientError(fmt.Sprintf("[Connection] message length: %d, only read: %d", messageLength, count))
err = newClientError(fmt.Sprintf("[Connection] message length: %d, only read: %d", messageLength, count), nil)
}

if err == nil {
@@ -275,7 +275,7 @@ func (c *connection) write(data []byte) error {
if count != len(data) {
// connection will eventually be expired
c.setState(connInactive)
return newClientError(fmt.Sprintf("[Connection] data length: %d, only wrote: %d", len(data), count))
return newClientError(fmt.Sprintf("[Connection] data length: %d, only wrote: %d", len(data), count), nil)
}
return nil
}
8 changes: 4 additions & 4 deletions connection_manager.go
Original file line number Diff line number Diff line change
@@ -44,10 +44,10 @@ type connectionManager struct {
}

var (
ErrConnectionManagerRequiresOptions = newClientError("[connectionManager] new manager requires options")
ErrConnectionManagerRequiresAddress = newClientError("[connectionManager] new manager requires non-nil address")
ErrConnectionManagerRequiresStopChannel = newClientError("[connectionManager] new manager requires non-nil stop channel")
ErrConnMgrAllConnectionsInUse = newClientError("[connectionManager] all connections in use at max connections reached")
ErrConnectionManagerRequiresOptions = newClientError("[connectionManager] new manager requires options", nil)
ErrConnectionManagerRequiresAddress = newClientError("[connectionManager] new manager requires non-nil address", nil)
ErrConnectionManagerRequiresStopChannel = newClientError("[connectionManager] new manager requires non-nil stop channel", nil)
ErrConnMgrAllConnectionsInUse = newClientError("[connectionManager] all connections in use at max connections reached", nil)
)

func newConnectionManager(options *connectionManagerOptions) (*connectionManager, error) {
4 changes: 2 additions & 2 deletions crdt_commands.go
Original file line number Diff line number Diff line change
@@ -1171,10 +1171,10 @@ func (builder *UpdateMapCommandBuilder) Build() (Command, error) {
return nil, err
}
if builder.mapOperation == nil {
return nil, newClientError("UpdateMapCommandBuilder requires non-nil MapOperation. Use WithMapOperation()")
return nil, newClientError("UpdateMapCommandBuilder requires non-nil MapOperation. Use WithMapOperation()", nil)
}
if builder.mapOperation.hasRemoves(true) && builder.protobuf.GetContext() == nil {
return nil, newClientError("When doing any removes a context must be provided.")
return nil, newClientError("When doing any removes a context must be provided.", nil)
}
return &UpdateMapCommand{protobuf: builder.protobuf, op: builder.mapOperation}, nil
}
30 changes: 17 additions & 13 deletions error.go
Original file line number Diff line number Diff line change
@@ -38,27 +38,31 @@ func (e RiakError) Error() (s string) {

// Client errors
var (
ErrAddressRequired = newClientError("RemoteAddress is required in options")
ErrAuthMissingConfig = newClientError("[Connection] authentication is missing TLS config")
ErrAuthTLSUpgradeFailed = newClientError("[Connection] upgrading to TLS connection failed")
ErrBucketRequired = newClientError("Bucket is required")
ErrKeyRequired = newClientError("Key is required")
ErrNilOptions = newClientError("[Command] options must be non-nil")
ErrOptionsRequired = newClientError("Options are required")
ErrNoNodesAvailable = newClientError("No nodes available to execute command, or exhausted all tries")
ErrZeroLength = newClientError("[Command] 0 byte data response")
ErrAddressRequired = newClientError("RemoteAddress is required in options", nil)
ErrAuthMissingConfig = newClientError("[Connection] authentication is missing TLS config", nil)
ErrAuthTLSUpgradeFailed = newClientError("[Connection] upgrading to TLS connection failed", nil)
ErrBucketRequired = newClientError("Bucket is required", nil)
ErrKeyRequired = newClientError("Key is required", nil)
ErrNilOptions = newClientError("[Command] options must be non-nil", nil)
ErrOptionsRequired = newClientError("Options are required", nil)
ErrZeroLength = newClientError("[Command] 0 byte data response", nil)
)

type ClientError struct {
Errmsg string
Errmsg string
InnerError error
}

func newClientError(errmsg string) error {
func newClientError(errmsg string, innerError error) error {
return ClientError{
Errmsg: errmsg,
Errmsg: errmsg,
InnerError: innerError,
}
}

func (e ClientError) Error() (s string) {
return fmt.Sprintf("ClientError|%s", e.Errmsg)
if e.InnerError == nil {
return fmt.Sprintf("ClientError|%s", e.Errmsg)
}
return fmt.Sprintf("ClientError|%s|InnerError|%v", e.Errmsg, e.InnerError)
}
10 changes: 5 additions & 5 deletions kv_commands.go
Original file line number Diff line number Diff line change
@@ -817,7 +817,7 @@ func (builder *ListBucketsCommandBuilder) Build() (Command, error) {
return nil, err
}
if builder.protobuf.GetStream() && builder.callback == nil {
return nil, newClientError("ListBucketsCommand requires a callback when streaming.")
return nil, newClientError("ListBucketsCommand requires a callback when streaming.", nil)
}
return &ListBucketsCommand{protobuf: builder.protobuf, callback: builder.callback}, nil
}
@@ -977,7 +977,7 @@ func (builder *ListKeysCommandBuilder) Build() (Command, error) {
return nil, err
}
if builder.streaming && builder.callback == nil {
return nil, newClientError("ListKeysCommand requires a callback when streaming.")
return nil, newClientError("ListKeysCommand requires a callback when streaming.", nil)
}
return &ListKeysCommand{
protobuf: builder.protobuf,
@@ -1350,10 +1350,10 @@ func (builder *SecondaryIndexQueryCommandBuilder) Build() (Command, error) {
}
if builder.protobuf.GetKey() == nil &&
(builder.protobuf.GetRangeMin() == nil || builder.protobuf.GetRangeMax() == nil) {
return nil, newClientError("either WithIndexKey or WithRange are required")
return nil, newClientError("either WithIndexKey or WithRange are required", nil)
}
if builder.protobuf.GetStream() && builder.callback == nil {
return nil, newClientError("SecondaryIndexQueryCommand requires a callback when streaming.")
return nil, newClientError("SecondaryIndexQueryCommand requires a callback when streaming.", nil)
}
return &SecondaryIndexQueryCommand{
protobuf: builder.protobuf,
@@ -1480,7 +1480,7 @@ func (builder *MapReduceCommandBuilder) Build() (Command, error) {
panic("builder.protobuf must not be nil")
}
if builder.streaming && builder.callback == nil {
return nil, newClientError("MapReduceCommand requires a callback when streaming.")
return nil, newClientError("MapReduceCommand requires a callback when streaming.", nil)
}
return &MapReduceCommand{
protobuf: builder.protobuf,
6 changes: 3 additions & 3 deletions kv_commands_i_test.go
Original file line number Diff line number Diff line change
@@ -653,10 +653,10 @@ func TestMapReduceCommand(t *testing.T) {
if err = cluster.Execute(cmd); err != nil {
t.Fatal(err.Error())
}
if cerr := cmd.Error(); cerr != nil {
t.Fatal(cerr)
}
if mr, ok := cmd.(*MapReduceCommand); ok {
if mr.Error != nil {
t.Fatal(mr.Error.Error())
}
if mr.Response == nil || len(mr.Response) == 0 {
t.Error("expected non-nil and non-empty response")
}
2 changes: 1 addition & 1 deletion node_manager.go
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@ type NodeManager interface {
Stop()
}

var ErrDefaultNodeManagerRequiresNode = newClientError("Must pass at least one node to default node manager")
var ErrDefaultNodeManagerRequiresNode = newClientError("Must pass at least one node to default node manager", nil)

type defaultNodeManager struct {
qsz uint16
4 changes: 2 additions & 2 deletions queue.go
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@ func (q *queue) enqueue(v interface{}) error {

func (q *queue) _do_enqueue(v interface{}) error {
if len(q.queueChan) == int(q.queueSize) {
return newClientError("attempt to enqueue when queue is full")
return newClientError("attempt to enqueue when queue is full", nil)
}
q.queueChan <- v
return nil
@@ -45,7 +45,7 @@ func (q *queue) _do_dequeue() (interface{}, error) {
select {
case v, ok := <-q.queueChan:
if !ok {
return nil, newClientError("attempt to dequeue from closed queue")
return nil, newClientError("attempt to dequeue from closed queue", nil)
}
return v, nil
default:
2 changes: 1 addition & 1 deletion rpb.go
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@ func rpbValidateResp(data []byte, expected byte) (err error) {

func rpbEnsureCode(expected byte, actual byte) (err error) {
if expected != actual {
err = newClientError(fmt.Sprintf("expected response code %d, got: %d", expected, actual))
err = newClientError(fmt.Sprintf("expected response code %d, got: %d", expected, actual), nil)
}
return
}