Skip to content

Commit

Permalink
Error handling for fetcher and keys packages (#442)
Browse files Browse the repository at this point in the history
* feat: error handling for fetcher and keys

Signed-off-by: Jingfu Wang <jingfu.wang@coinbase.com>

* fix: make gen

Signed-off-by: Jingfu Wang <jingfu.wang@coinbase.com>

Signed-off-by: Jingfu Wang <jingfu.wang@coinbase.com>
  • Loading branch information
GeekArthur authored Sep 1, 2022
1 parent 18b582a commit 64419e3
Show file tree
Hide file tree
Showing 18 changed files with 151 additions and 143 deletions.
12 changes: 6 additions & 6 deletions fetcher/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (f *Fetcher) AccountBalance(
) (*types.BlockIdentifier, []*types.Amount, map[string]interface{}, *Error) {
if err := f.connectionSemaphore.Acquire(ctx, semaphoreRequestWeight); err != nil {
return nil, nil, nil, &Error{
Err: fmt.Errorf("%w: %s", ErrCouldNotAcquireSemaphore, err.Error()),
Err: fmt.Errorf("failed to acquire semaphore: %w", err),
}
}
defer f.connectionSemaphore.Release(semaphoreRequestWeight)
Expand All @@ -57,7 +57,7 @@ func (f *Fetcher) AccountBalance(
); err != nil {
fetcherErr := &Error{
Err: fmt.Errorf(
"%w: /account/balance",
"/account/balance response is invalid: %w",
err,
),
}
Expand Down Expand Up @@ -102,7 +102,7 @@ func (f *Fetcher) AccountBalanceRetry(

if is, _ := asserter.Err(err.Err); is {
fetcherErr := &Error{
Err: fmt.Errorf("%w: /account/balance not attempting retry", err.Err),
Err: fmt.Errorf("/account/balance not attempting retry: %w", err.Err),
ClientErr: err.ClientErr,
}
return nil, nil, nil, fetcherErr
Expand All @@ -129,7 +129,7 @@ func (f *Fetcher) AccountCoins(
) (*types.BlockIdentifier, []*types.Coin, map[string]interface{}, *Error) {
if err := f.connectionSemaphore.Acquire(ctx, semaphoreRequestWeight); err != nil {
return nil, nil, nil, &Error{
Err: fmt.Errorf("%w: %s", ErrCouldNotAcquireSemaphore, err.Error()),
Err: fmt.Errorf("failed to acquire semaphore: %w", err),
}
}
defer f.connectionSemaphore.Release(semaphoreRequestWeight)
Expand All @@ -151,7 +151,7 @@ func (f *Fetcher) AccountCoins(
); err != nil {
fetcherErr := &Error{
Err: fmt.Errorf(
"%w: /account/coins",
"/account/coins response is invalid: %w",
err,
),
}
Expand Down Expand Up @@ -195,7 +195,7 @@ func (f *Fetcher) AccountCoinsRetry(

if is, _ := asserter.Err(err.Err); is {
fetcherErr := &Error{
Err: fmt.Errorf("%w: /account/coins not attempting retry", err.Err),
Err: fmt.Errorf("/account/coins not attempting retry: %w", err.Err),
ClientErr: err.ClientErr,
}
return nil, nil, nil, fetcherErr
Expand Down
8 changes: 4 additions & 4 deletions fetcher/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (f *Fetcher) fetchChannelTransactions(
// We keep the lock for all transactions we fetch in this goroutine.
if err := f.connectionSemaphore.Acquire(ctx, semaphoreRequestWeight); err != nil {
return &Error{
Err: fmt.Errorf("%w: %s", ErrCouldNotAcquireSemaphore, err.Error()),
Err: fmt.Errorf("failed to acquire semaphore: %w", err),
}
}
defer f.connectionSemaphore.Release(semaphoreRequestWeight)
Expand Down Expand Up @@ -215,7 +215,7 @@ func (f *Fetcher) UnsafeBlock(
) (*types.Block, *Error) {
if err := f.connectionSemaphore.Acquire(ctx, semaphoreRequestWeight); err != nil {
return nil, &Error{
Err: fmt.Errorf("%w: %s", ErrCouldNotAcquireSemaphore, err.Error()),
Err: fmt.Errorf("failed to acquire semaphore: %w", err),
}
}
defer f.connectionSemaphore.Release(semaphoreRequestWeight)
Expand Down Expand Up @@ -274,7 +274,7 @@ func (f *Fetcher) Block(

if err := f.Asserter.Block(block); err != nil {
fetcherErr := &Error{
Err: fmt.Errorf("%w: /block", err),
Err: fmt.Errorf("/block response is invalid: %w", err),
}
return nil, fetcherErr
}
Expand Down Expand Up @@ -314,7 +314,7 @@ func (f *Fetcher) BlockRetry(

if is, _ := asserter.Err(err.Err); is {
fetcherErr := &Error{
Err: fmt.Errorf("%w: /block not attempting retry", err.Err),
Err: fmt.Errorf("/block not attempting retry: %w", err.Err),
ClientErr: err.ClientErr,
}
return nil, fetcherErr
Expand Down
4 changes: 2 additions & 2 deletions fetcher/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (f *Fetcher) Call(
) (map[string]interface{}, bool, *Error) {
if err := f.connectionSemaphore.Acquire(ctx, semaphoreRequestWeight); err != nil {
return nil, false, &Error{
Err: fmt.Errorf("%w: %s", ErrCouldNotAcquireSemaphore, err.Error()),
Err: fmt.Errorf("failed to acquire semaphore: %w", err),
}
}
defer f.connectionSemaphore.Release(semaphoreRequestWeight)
Expand Down Expand Up @@ -84,7 +84,7 @@ func (f *Fetcher) CallRetry(

if is, _ := asserter.Err(err.Err); is {
fetcherErr := &Error{
Err: fmt.Errorf("%w: /call not attempting retry", err.Err),
Err: fmt.Errorf("/call not attempting retry: %w", err.Err),
ClientErr: err.ClientErr,
}
return nil, false, fetcherErr
Expand Down
32 changes: 16 additions & 16 deletions fetcher/construction.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (f *Fetcher) ConstructionCombine(
) (string, *Error) {
if err := f.connectionSemaphore.Acquire(ctx, semaphoreRequestWeight); err != nil {
return "", &Error{
Err: fmt.Errorf("%w: %s", ErrCouldNotAcquireSemaphore, err.Error()),
Err: fmt.Errorf("failed to acquire semaphore: %w", err),
}
}
defer f.connectionSemaphore.Release(semaphoreRequestWeight)
Expand All @@ -53,7 +53,7 @@ func (f *Fetcher) ConstructionCombine(

if err := asserter.ConstructionCombineResponse(response); err != nil {
fetcherErr := &Error{
Err: fmt.Errorf("%w: /construction/combine", err),
Err: fmt.Errorf("/construction/combine response is invalid: %w", err),
}
return "", fetcherErr
}
Expand All @@ -74,7 +74,7 @@ func (f *Fetcher) ConstructionDerive(
) (*types.AccountIdentifier, map[string]interface{}, *Error) {
if err := f.connectionSemaphore.Acquire(ctx, semaphoreRequestWeight); err != nil {
return nil, nil, &Error{
Err: fmt.Errorf("%w: %s", ErrCouldNotAcquireSemaphore, err.Error()),
Err: fmt.Errorf("failed to acquire semaphore: %w", err),
}
}
defer f.connectionSemaphore.Release(semaphoreRequestWeight)
Expand All @@ -92,7 +92,7 @@ func (f *Fetcher) ConstructionDerive(

if err := asserter.ConstructionDeriveResponse(response); err != nil {
fetcherErr := &Error{
Err: fmt.Errorf("%w: /construction/derive", err),
Err: fmt.Errorf("/construction/derive response is invalid: %w", err),
}
return nil, nil, fetcherErr
}
Expand All @@ -109,7 +109,7 @@ func (f *Fetcher) ConstructionHash(
) (*types.TransactionIdentifier, *Error) {
if err := f.connectionSemaphore.Acquire(ctx, semaphoreRequestWeight); err != nil {
return nil, &Error{
Err: fmt.Errorf("%w: %s", ErrCouldNotAcquireSemaphore, err.Error()),
Err: fmt.Errorf("failed to acquire semaphore: %w", err),
}
}
defer f.connectionSemaphore.Release(semaphoreRequestWeight)
Expand All @@ -126,7 +126,7 @@ func (f *Fetcher) ConstructionHash(

if err := asserter.TransactionIdentifierResponse(response); err != nil {
fetcherErr := &Error{
Err: fmt.Errorf("%w: /construction/hash", err),
Err: fmt.Errorf("/construction/hash response is invalid: %w", err),
}
return nil, fetcherErr
}
Expand All @@ -144,7 +144,7 @@ func (f *Fetcher) ConstructionMetadata(
) (map[string]interface{}, []*types.Amount, *Error) {
if err := f.connectionSemaphore.Acquire(ctx, semaphoreRequestWeight); err != nil {
return nil, nil, &Error{
Err: fmt.Errorf("%w: %s", ErrCouldNotAcquireSemaphore, err.Error()),
Err: fmt.Errorf("failed to acquire semaphore: %w", err),
}
}
defer f.connectionSemaphore.Release(semaphoreRequestWeight)
Expand All @@ -162,7 +162,7 @@ func (f *Fetcher) ConstructionMetadata(

if err := asserter.ConstructionMetadataResponse(metadata); err != nil {
fetcherErr := &Error{
Err: fmt.Errorf("%w: /construction/metadata", err),
Err: fmt.Errorf("/construction/metadata response is invalid: %w", err),
}
return nil, nil, fetcherErr
}
Expand All @@ -183,7 +183,7 @@ func (f *Fetcher) ConstructionParse(
) ([]*types.Operation, []*types.AccountIdentifier, map[string]interface{}, *Error) {
if err := f.connectionSemaphore.Acquire(ctx, semaphoreRequestWeight); err != nil {
return nil, nil, nil, &Error{
Err: fmt.Errorf("%w: %s", ErrCouldNotAcquireSemaphore, err.Error()),
Err: fmt.Errorf("failed to acquire semaphore: %w", err),
}
}
defer f.connectionSemaphore.Release(semaphoreRequestWeight)
Expand All @@ -201,7 +201,7 @@ func (f *Fetcher) ConstructionParse(

if err := f.Asserter.ConstructionParseResponse(response, signed); err != nil {
fetcherErr := &Error{
Err: fmt.Errorf("%w: /construction/parse", err),
Err: fmt.Errorf("/construction/parse response is invalid: %w", err),
}
return nil, nil, nil, fetcherErr
}
Expand Down Expand Up @@ -229,7 +229,7 @@ func (f *Fetcher) ConstructionPayloads(
) (string, []*types.SigningPayload, *Error) {
if err := f.connectionSemaphore.Acquire(ctx, semaphoreRequestWeight); err != nil {
return "", nil, &Error{
Err: fmt.Errorf("%w: %s", ErrCouldNotAcquireSemaphore, err.Error()),
Err: fmt.Errorf("failed to acquire semaphore: %w", err),
}
}
defer f.connectionSemaphore.Release(semaphoreRequestWeight)
Expand All @@ -249,7 +249,7 @@ func (f *Fetcher) ConstructionPayloads(

if err := asserter.ConstructionPayloadsResponse(response); err != nil {
fetcherErr := &Error{
Err: fmt.Errorf("%w: /construction/payloads", err),
Err: fmt.Errorf("/construction/payloads response is invalid: %w", err),
}
return "", nil, fetcherErr
}
Expand All @@ -272,7 +272,7 @@ func (f *Fetcher) ConstructionPreprocess(
) (map[string]interface{}, []*types.AccountIdentifier, *Error) {
if err := f.connectionSemaphore.Acquire(ctx, semaphoreRequestWeight); err != nil {
return nil, nil, &Error{
Err: fmt.Errorf("%w: %s", ErrCouldNotAcquireSemaphore, err.Error()),
Err: fmt.Errorf("failed to acquire semaphore: %w", err),
}
}
defer f.connectionSemaphore.Release(semaphoreRequestWeight)
Expand All @@ -291,7 +291,7 @@ func (f *Fetcher) ConstructionPreprocess(

if err := asserter.ConstructionPreprocessResponse(response); err != nil {
fetcherErr := &Error{
Err: fmt.Errorf("%w: /construction/preprocess", err),
Err: fmt.Errorf("/construction/preprocess response is invalid: %w", err),
}
return nil, nil, fetcherErr
}
Expand All @@ -308,7 +308,7 @@ func (f *Fetcher) ConstructionSubmit(
) (*types.TransactionIdentifier, map[string]interface{}, *Error) {
if err := f.connectionSemaphore.Acquire(ctx, semaphoreRequestWeight); err != nil {
return nil, nil, &Error{
Err: fmt.Errorf("%w: %s", ErrCouldNotAcquireSemaphore, err.Error()),
Err: fmt.Errorf("failed to acquire semaphore: %w", err),
}
}
defer f.connectionSemaphore.Release(semaphoreRequestWeight)
Expand All @@ -326,7 +326,7 @@ func (f *Fetcher) ConstructionSubmit(

if err := asserter.TransactionIdentifierResponse(submitResponse); err != nil {
fetcherErr := &Error{
Err: fmt.Errorf("%w: /construction/submit", err),
Err: fmt.Errorf("/construction/submit response is invalid: %w", err),
}
return nil, nil, fetcherErr
}
Expand Down
7 changes: 1 addition & 6 deletions fetcher/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (f *Fetcher) RequestFailedError(
}

return &Error{
Err: fmt.Errorf("%w: %s %s", ErrRequestFailed, message, err.Error()),
Err: fmt.Errorf("%s %s: %w", message, err.Error(), ErrRequestFailed),
ClientErr: rosettaErr,
Retry: ((rosettaErr != nil && rosettaErr.Retriable) || transientError(err) || f.forceRetry) &&
!errors.Is(err, context.Canceled),
Expand All @@ -78,10 +78,6 @@ var (
// ErrExhaustedRetries is returned when a request with retries
// fails because it was attempted too many times.
ErrExhaustedRetries = errors.New("retries exhausted")

// ErrCouldNotAcquireSemaphore is returned when acquiring
// the connection semaphore returns an error.
ErrCouldNotAcquireSemaphore = errors.New("could not acquire semaphore")
)

// Err takes an error as an argument and returns
Expand All @@ -92,7 +88,6 @@ func Err(err error) bool {
ErrNetworkMissing,
ErrRequestFailed,
ErrExhaustedRetries,
ErrCouldNotAcquireSemaphore,
}

return utils.FindError(fetcherErrors, err)
Expand Down
6 changes: 3 additions & 3 deletions fetcher/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (f *Fetcher) EventsBlocks(
) (int64, []*types.BlockEvent, *Error) {
if err := f.connectionSemaphore.Acquire(ctx, semaphoreRequestWeight); err != nil {
return -1, nil, &Error{
Err: fmt.Errorf("%w: %s", ErrCouldNotAcquireSemaphore, err.Error()),
Err: fmt.Errorf("failed to acquire semaphore: %w", err),
}
}
defer f.connectionSemaphore.Release(semaphoreRequestWeight)
Expand All @@ -53,7 +53,7 @@ func (f *Fetcher) EventsBlocks(
); err != nil {
fetcherErr := &Error{
Err: fmt.Errorf(
"%w: /events/blocks",
"/events/blocks response is invalid: %w",
err,
),
}
Expand Down Expand Up @@ -95,7 +95,7 @@ func (f *Fetcher) EventsBlocksRetry(

if is, _ := asserter.Err(err.Err); is {
fetcherErr := &Error{
Err: fmt.Errorf("%w: /events/blocks not attempting retry", err.Err),
Err: fmt.Errorf("/events/blocks not attempting retry: %w", err.Err),
ClientErr: err.ClientErr,
}
return -1, nil, fetcherErr
Expand Down
4 changes: 2 additions & 2 deletions fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,10 @@ func (f *Fetcher) InitializeAsserter(
if !exists {
return nil, nil, &Error{
Err: fmt.Errorf(
"%w: %s not in %s",
ErrNetworkMissing,
"%s not in %s: %w",
types.PrintStruct(networkIdentifier),
types.PrintStruct(supportedNetworks),
ErrNetworkMissing,
),
}
}
Expand Down
8 changes: 4 additions & 4 deletions fetcher/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (f *Fetcher) Mempool(
) ([]*types.TransactionIdentifier, *Error) {
if err := f.connectionSemaphore.Acquire(ctx, semaphoreRequestWeight); err != nil {
return nil, &Error{
Err: fmt.Errorf("%w: %s", ErrCouldNotAcquireSemaphore, err.Error()),
Err: fmt.Errorf("failed to acquire semaphore: %w", err),
}
}
defer f.connectionSemaphore.Release(semaphoreRequestWeight)
Expand All @@ -48,7 +48,7 @@ func (f *Fetcher) Mempool(
mempool := response.TransactionIdentifiers
if err := asserter.MempoolTransactions(mempool); err != nil {
fetcherErr := &Error{
Err: fmt.Errorf("%w: /mempool", err),
Err: fmt.Errorf("/mempool response is invalid: %w", err),
}
return nil, fetcherErr
}
Expand All @@ -65,7 +65,7 @@ func (f *Fetcher) MempoolTransaction(
) (*types.Transaction, map[string]interface{}, *Error) {
if err := f.connectionSemaphore.Acquire(ctx, semaphoreRequestWeight); err != nil {
return nil, nil, &Error{
Err: fmt.Errorf("%w: %s", ErrCouldNotAcquireSemaphore, err.Error()),
Err: fmt.Errorf("failed to acquire semaphore: %w", err),
}
}
defer f.connectionSemaphore.Release(semaphoreRequestWeight)
Expand All @@ -84,7 +84,7 @@ func (f *Fetcher) MempoolTransaction(
mempoolTransaction := response.Transaction
if err := f.Asserter.Transaction(mempoolTransaction); err != nil {
fetcherErr := &Error{
Err: fmt.Errorf("%w: /mempool/transaction", err),
Err: fmt.Errorf("/mempool/transaction response is invalid: %w", err),
}
return nil, nil, fetcherErr
}
Expand Down
Loading

0 comments on commit 64419e3

Please sign in to comment.