Skip to content

Commit

Permalink
feature: Extend operation errors with correlation ID and node URL
Browse files Browse the repository at this point in the history
  • Loading branch information
skambalin committed Oct 7, 2024
1 parent 5a691b6 commit d795520
Show file tree
Hide file tree
Showing 12 changed files with 213 additions and 57 deletions.
24 changes: 13 additions & 11 deletions packages/ddc-client/src/DdcClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,19 @@ export class DdcClient {

logger.debug(config, 'DdcClient created');

bindErrorLogger(this, this.logger, [
'getBalance',
'depositBalance',
'getDeposit',
'createBucket',
'getBucket',
'getBucketList',
'store',
'read',
'resolveName',
]);
if (config.logErrors !== false) {
bindErrorLogger(this, this.logger, [
'getBalance',
'depositBalance',
'getDeposit',
'createBucket',
'getBucket',
'getBucketList',
'store',
'read',
'resolveName',
]);
}
}

/**
Expand Down
2 changes: 2 additions & 0 deletions packages/ddc-client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ export {
MAINNET,
AuthToken,
AuthTokenOperation,
StorageNodeMode,
NodeError,
type DagNodeStoreOptions,
type Signer,
} from '@cere-ddc-sdk/ddc';
Expand Down
5 changes: 5 additions & 0 deletions packages/ddc/src/logger/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ export type LoggerOptions = {
logLevel?: LogLevel;
logOptions?: LoggerConfig;
logger?: Logger;

/**
* Wether to log all errors (including caught ones)
*/
logErrors?: boolean;
};

export type { Logger };
81 changes: 57 additions & 24 deletions packages/ddc/src/nodes/BalancedNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@ import { Router, RouterOperation } from '../routing';
import { Piece, MultipartPiece } from '../Piece';
import { DagNode } from '../DagNode';
import { CnsRecord } from '../CnsRecord';
import { Logger, LoggerOptions, createLogger } from '../logger';
import { Logger, LoggerOptions, bindErrorLogger, createLogger } from '../logger';
import { createCorrelationId } from '../activity';
import { NodeError } from './NodeError';
import {
DagNodeGetOptions,
DagNodeStoreOptions,
PieceReadOptions,
PieceStoreOptions,
NodeInterface,
CnsRecordGetOptions,
CorrelationOptions,
} from './NodeInterface';

/**
Expand Down Expand Up @@ -73,6 +76,18 @@ export class BalancedNode implements NodeInterface {

this.retryOptions = { ...this.retryOptions, ...retryOptions, retries: attempts };
}

if (config.logErrors !== false) {
bindErrorLogger(this, this.logger, [
'storePiece',
'storeDagNode',
'readPiece',
'getDagNode',
'storeCnsRecord',
'getCnsRecord',
'resolveName',
]);
}
}

/**
Expand All @@ -86,9 +101,12 @@ export class BalancedNode implements NodeInterface {
private async withRetry<T>(
bucketId: BucketId,
operation: RouterOperation,
{ correlationId = createCorrelationId() }: CorrelationOptions,
body: (node: NodeInterface, bail: (e: Error) => void, attempt: number) => Promise<T>,
) {
let lastError: RpcError | undefined;
let lastOperationError: RpcError | undefined;
let lastRouterError: Error | undefined;

const exclude: NodeInterface[] = [];

return retry(
Expand All @@ -99,6 +117,7 @@ export class BalancedNode implements NodeInterface {
node = await this.router.getNode(
operation,
bucketId,
{ logErrors: false },
exclude.map((node) => node.nodeId),
);

Expand All @@ -117,25 +136,33 @@ export class BalancedNode implements NodeInterface {
node.displayName,
);
}

if (error instanceof Error) {
lastRouterError = error;
}
}

if (!node) {
throw lastError ?? new Error('No nodes available to handle the operation');
throw lastOperationError ?? lastRouterError ?? new Error('No nodes available to handle the operation');
}

try {
return await body(node, bail, attempt);
} catch (error) {
if (
error instanceof RpcError &&
RETRYABLE_GRPC_ERROR_CODES.map((status) => GrpcStatus[status]).includes(error.code)
) {
lastError = error;
const nodeError = error instanceof RpcError ? NodeError.fromRpcError(error) : undefined;

throw error;
if (nodeError) {
nodeError.nodeId = node.nodeId;
nodeError.correlationId = correlationId;

if (RETRYABLE_GRPC_ERROR_CODES.map((status) => GrpcStatus[status]).includes(nodeError.code)) {
lastOperationError = nodeError;

throw nodeError;
}
}

bail(error as Error);
bail(nodeError || (error as Error));
}
},
{
Expand All @@ -149,8 +176,8 @@ export class BalancedNode implements NodeInterface {
) as T;
}

async storePiece(bucketId: BucketId, piece: Piece | MultipartPiece, options?: PieceStoreOptions) {
return this.withRetry(bucketId, RouterOperation.STORE_PIECE, (node, bail, attempt) =>
async storePiece(bucketId: BucketId, piece: Piece | MultipartPiece, options: PieceStoreOptions = {}) {
return this.withRetry(bucketId, RouterOperation.STORE_PIECE, options, (node, bail, attempt) =>
/**
* Clone the piece if it is a piece and this is not the first attempt.
* This is done to avoid reusing the same stream multiple times.
Expand All @@ -159,32 +186,38 @@ export class BalancedNode implements NodeInterface {
);
}

async readPiece(bucketId: BucketId, cidOrName: string, options?: PieceReadOptions) {
return this.withRetry(bucketId, RouterOperation.READ_PIECE, (node) => node.readPiece(bucketId, cidOrName, options));
async readPiece(bucketId: BucketId, cidOrName: string, options: PieceReadOptions = {}) {
return this.withRetry(bucketId, RouterOperation.READ_PIECE, options, (node) =>
node.readPiece(bucketId, cidOrName, options),
);
}

async storeDagNode(bucketId: BucketId, dagNode: DagNode, options?: DagNodeStoreOptions) {
return this.withRetry(bucketId, RouterOperation.STORE_DAG_NODE, (node) =>
async storeDagNode(bucketId: BucketId, dagNode: DagNode, options: DagNodeStoreOptions = {}) {
return this.withRetry(bucketId, RouterOperation.STORE_DAG_NODE, options, (node) =>
node.storeDagNode(bucketId, dagNode, options),
);
}

async getDagNode(bucketId: BucketId, cidOrName: string, options?: DagNodeGetOptions) {
return this.withRetry(bucketId, RouterOperation.READ_DAG_NODE, (node) =>
async getDagNode(bucketId: BucketId, cidOrName: string, options: DagNodeGetOptions = {}) {
return this.withRetry(bucketId, RouterOperation.READ_DAG_NODE, options, (node) =>
node.getDagNode(bucketId, cidOrName, options),
);
}

async storeCnsRecord(bucketId: BucketId, record: CnsRecord) {
return this.withRetry(bucketId, RouterOperation.STORE_CNS_RECORD, (node) => node.storeCnsRecord(bucketId, record));
async storeCnsRecord(bucketId: BucketId, record: CnsRecord, options: DagNodeStoreOptions = {}) {
return this.withRetry(bucketId, RouterOperation.STORE_CNS_RECORD, options, (node) =>
node.storeCnsRecord(bucketId, record),
);
}

async getCnsRecord(bucketId: BucketId, name: string) {
return this.withRetry(bucketId, RouterOperation.READ_CNS_RECORD, (node) => node.getCnsRecord(bucketId, name));
async getCnsRecord(bucketId: BucketId, name: string, options: CnsRecordGetOptions = {}) {
return this.withRetry(bucketId, RouterOperation.READ_CNS_RECORD, options, (node) =>
node.getCnsRecord(bucketId, name),
);
}

async resolveName(bucketId: BucketId, cidOrName: string, options?: CnsRecordGetOptions) {
return this.withRetry(bucketId, RouterOperation.READ_CNS_RECORD, (node) =>
async resolveName(bucketId: BucketId, cidOrName: string, options: CnsRecordGetOptions = {}) {
return this.withRetry(bucketId, RouterOperation.READ_CNS_RECORD, options, (node) =>
node.resolveName(bucketId, cidOrName, options),
);
}
Expand Down
15 changes: 15 additions & 0 deletions packages/ddc/src/nodes/NodeError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { RpcError } from '@protobuf-ts/runtime-rpc';

export class NodeError extends RpcError {
correlationId?: string;
nodeId?: string;

static fromRpcError(error: RpcError): NodeError {
const finalError = new NodeError(error.message, error.code, error.meta);

finalError.methodName = error.methodName;
finalError.serviceName = error.serviceName;

return finalError;
}
}
25 changes: 15 additions & 10 deletions packages/ddc/src/nodes/NodeInterface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@ type NamingOptions = {
name?: string;
};

type ActivityOptions = {
correlationId?: string;
};

type CacheControlOptions = {
cacheControl?: 'no-cache';
};

/**
* The `CorrelationOptions` type defines the correlation options for a DDC operation.
*
* @hidden
*/
export type CorrelationOptions = {
correlationId?: string;
};

/**
* The `OperationAuthOptions` type defines the authentication options for a DDC operation.
*
Expand All @@ -39,7 +44,7 @@ export type OperationAuthOptions = {
* @extends OperationAuthOptions
*/
export type PieceReadOptions = CacheControlOptions &
ActivityOptions &
CorrelationOptions &
OperationAuthOptions & {
/**
* An optional range to read from the piece.
Expand All @@ -54,7 +59,7 @@ export type PieceReadOptions = CacheControlOptions &
* @extends OperationAuthOptions
*/
export type DagNodeGetOptions = CacheControlOptions &
ActivityOptions &
CorrelationOptions &
OperationAuthOptions & {
/**
* An optional path to retrieve from the DAG node.
Expand All @@ -69,7 +74,7 @@ export type DagNodeGetOptions = CacheControlOptions &
* @extends OperationAuthOptions
*/
export type CnsRecordGetOptions = CacheControlOptions &
ActivityOptions &
CorrelationOptions &
OperationAuthOptions & {
/**
* An optional path to retrieve from the CNS record.
Expand All @@ -84,7 +89,7 @@ export type CnsRecordGetOptions = CacheControlOptions &
* @extends NamingOptions
* @extends OperationAuthOptions
*/
export type PieceStoreOptions = ActivityOptions & NamingOptions & OperationAuthOptions;
export type PieceStoreOptions = CorrelationOptions & NamingOptions & OperationAuthOptions;

/**
* The `DagNodeStoreOptions` type defines the options for storing a DAG node.
Expand All @@ -93,15 +98,15 @@ export type PieceStoreOptions = ActivityOptions & NamingOptions & OperationAuthO
* @extends NamingOptions
* @extends OperationAuthOptions
*/
export type DagNodeStoreOptions = ActivityOptions & NamingOptions & OperationAuthOptions;
export type DagNodeStoreOptions = CorrelationOptions & NamingOptions & OperationAuthOptions;

/**
* The `CnsRecordStoreOptions` type defines the options for storing a CNS record.
*
* @hidden
* @extends OperationAuthOptions
*/
export type CnsRecordStoreOptions = ActivityOptions & OperationAuthOptions;
export type CnsRecordStoreOptions = CorrelationOptions & OperationAuthOptions;

/**
* The `NodeInterface` interface defines the methods to interact with DDC storage nodes.
Expand Down
20 changes: 11 additions & 9 deletions packages/ddc/src/nodes/StorageNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,17 @@ export class StorageNode implements NodeInterface {

this.logger.debug(config, 'Storage node initialized');

bindErrorLogger(this, this.logger, [
'storePiece',
'storeDagNode',
'readPiece',
'getDagNode',
'storeCnsRecord',
'getCnsRecord',
'resolveName',
]);
if (config.logErrors !== false) {
bindErrorLogger(this, this.logger, [
'storePiece',
'storeDagNode',
'readPiece',
'getDagNode',
'storeCnsRecord',
'getCnsRecord',
'resolveName',
]);
}
}

private async getRootToken() {
Expand Down
1 change: 1 addition & 0 deletions packages/ddc/src/nodes/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './StorageNode';
export * from './NodeInterface';
export * from './BalancedNode';
export * from './NodeError';
10 changes: 8 additions & 2 deletions packages/ddc/src/routing/Router.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { BucketId, Signer } from '@cere-ddc-sdk/blockchain';

import { StorageNode } from '../nodes';
import { StorageNode, StorageNodeConfig } from '../nodes';
import { RouterNode, RouterOperation, RoutingStrategy } from './RoutingStrategy';
import { BlockchainStrategy, BlockchainStrategyConfig } from './BlockchainStrategy';
import { StaticStrategy, StaticStrategyConfig } from './StaticStrategy';
Expand Down Expand Up @@ -68,7 +68,12 @@ export class Router {
*
* @throws Will throw an error if no nodes are available to handle the operation.
*/
async getNode(operation: RouterOperation, bucketId: BucketId, exclude: string[] = []) {
async getNode(
operation: RouterOperation,
bucketId: BucketId,
config: Partial<StorageNodeConfig> = {},
exclude: string[] = [],
) {
this.logger.info('Getting node for operation "%s" in bucket %s', operation, bucketId);

const sdkTokenPromise = this.getSdkToken();
Expand All @@ -88,6 +93,7 @@ export class Router {
logger: this.logger,
authToken: await sdkTokenPromise,
nodeId: node.nodeId || node.grpcUrl,
...config,
});

this.logger.info(`Selected node for operation "%s" in bucket %s: %s`, operation, bucketId, storageNode.displayName);
Expand Down
Loading

0 comments on commit d795520

Please sign in to comment.