Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion redisinsight/api/src/__mocks__/rdi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
RdiPipeline,
RdiStatisticsData,
} from 'src/modules/rdi/models';
import { ApiRdiClient } from 'src/modules/rdi/client/api.rdi.client';
import { ApiRdiClient } from 'src/modules/rdi/client/api/v1/api.rdi.client';
import { RdiEntity } from 'src/modules/rdi/entities/rdi.entity';
import { EncryptionStrategy } from 'src/modules/encryption/models';
import { RdiDryRunJobDto } from 'src/modules/rdi/dto';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,17 @@ import {
mockRdiUnauthorizedError,
} from 'src/__mocks__';
import { sign } from 'jsonwebtoken';
import { ApiRdiClient } from './api.rdi.client';
import { RdiDyRunJobStatus, RdiPipeline, RdiStatisticsStatus } from '../models';
import { PipelineActions, RdiUrl, TOKEN_THRESHOLD } from '../constants';
import { ApiRdiClient } from 'src/modules/rdi/client/api/v1/api.rdi.client';
import {
RdiDyRunJobStatus,
RdiPipeline,
RdiStatisticsStatus,
} from 'src/modules/rdi/models';
import {
PipelineActions,
RdiUrl,
TOKEN_THRESHOLD,
} from 'src/modules/rdi/constants';

const mockedAxios = axios as jest.Mocked<typeof axios>;
jest.mock('axios');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import {
RdiPipelineInternalServerErrorException,
parseErrorMessage,
wrapRdiPipelineError,
RdiResetPipelineFailedException,
RdiStartPipelineFailedException,
RdiStopPipelineFailedException,
} from 'src/modules/rdi/exceptions';
import {
RdiPipeline,
Expand All @@ -42,9 +45,6 @@ import {
convertApiDataToRdiPipeline,
convertRdiPipelineToApiPayload,
} from 'src/modules/rdi/utils/pipeline.util';
import { RdiResetPipelineFailedException } from '../exceptions/rdi-reset-pipeline-failed.exception';
import { RdiStartPipelineFailedException } from '../exceptions/rdi-start-pipeline-failed.exception';
import { RdiStopPipelineFailedException } from '../exceptions/rdi-stop-pipeline-failed.exception';

interface ConnectionsConfig {
sources: Record<string, Record<string, unknown>>;
Expand All @@ -53,7 +53,7 @@ interface ConnectionsConfig {
export class ApiRdiClient extends RdiClient {
protected readonly client: AxiosInstance;

private readonly logger = new Logger('ApiRdiClient');
protected readonly logger = new Logger('ApiRdiClient');

private auth: { jwt: string; exp: number };

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import axios from 'axios';
import {
mockRdi,
mockRdiClientMetadata,
mockRdiUnauthorizedError,
} from 'src/__mocks__';
import { ApiV2RdiClient } from 'src/modules/rdi/client/api/v2/api.v2.rdi.client';
import { RdiUrlV2 } from 'src/modules/rdi/constants';
import { RdiInfo } from 'src/modules/rdi/models';
import { RdiPipelineInternalServerErrorException } from 'src/modules/rdi/exceptions';

const mockedAxios = axios as jest.Mocked<typeof axios>;
jest.mock('axios');
mockedAxios.create = jest.fn(() => mockedAxios);

describe('ApiV2RdiClient', () => {
let client: ApiV2RdiClient;

beforeEach(() => {
jest.clearAllMocks();
client = new ApiV2RdiClient(mockRdiClientMetadata, mockRdi);
});

describe('getInfo', () => {
it('should return RDI info when API call is successful', async () => {
const mockInfoResponse = { version: '2.0.1' };
const expectedRdiInfo = Object.assign(new RdiInfo(), {
version: '2.0.1',
});
mockedAxios.get.mockResolvedValueOnce({ data: mockInfoResponse });

const result = await client.getInfo();

expect(result).toEqual(expectedRdiInfo);
expect(mockedAxios.get).toHaveBeenCalledWith(RdiUrlV2.GetInfo);
});

it('should throw wrapped error when API call fails', async () => {
mockedAxios.get.mockRejectedValueOnce(mockRdiUnauthorizedError);

await expect(client.getInfo()).rejects.toThrow(
mockRdiUnauthorizedError.message,
);
expect(mockedAxios.get).toHaveBeenCalledWith(RdiUrlV2.GetInfo);
});

it('should transform response data to RdiInfo instance', async () => {
const mockInfoResponse = { version: '2.1.0' };
mockedAxios.get.mockResolvedValueOnce({ data: mockInfoResponse });

const result = await client.getInfo();

expect(result).toBeInstanceOf(RdiInfo);
expect(result.version).toBe('2.1.0');
});
});

describe('selectPipeline', () => {
it('should select first pipeline when pipelines are available', async () => {
const mockPipelinesResponse = [
{
name: 'pipeline-1',
active: true,
config: {},
status: 'running',
errors: [],
components: [],
current: true,
},
{
name: 'pipeline-2',
active: false,
config: {},
status: 'stopped',
errors: [],
components: [],
current: false,
},
];
mockedAxios.get.mockResolvedValueOnce({ data: mockPipelinesResponse });

await client.selectPipeline();

expect(mockedAxios.get).toHaveBeenCalledWith(RdiUrlV2.GetPipelines);
expect(client['selectedPipeline']).toBe('pipeline-1');
});

it('should throw RdiPipelineInternalServerErrorException when no pipelines available', async () => {
mockedAxios.get.mockResolvedValueOnce({ data: [] });

await expect(client.selectPipeline()).rejects.toThrow(
RdiPipelineInternalServerErrorException,
);
});

it('should throw error with message "Unable to select pipeline" when no pipelines available', async () => {
mockedAxios.get.mockResolvedValueOnce({ data: [] });

await expect(client.selectPipeline()).rejects.toThrow(
'Unable to select pipeline',
);
});

it('should throw RdiPipelineInternalServerErrorException when data is null', async () => {
mockedAxios.get.mockResolvedValueOnce({ data: null });

await expect(client.selectPipeline()).rejects.toThrow(
RdiPipelineInternalServerErrorException,
);
});

it('should throw RdiPipelineInternalServerErrorException when data is undefined', async () => {
mockedAxios.get.mockResolvedValueOnce({ data: undefined });

await expect(client.selectPipeline()).rejects.toThrow(
RdiPipelineInternalServerErrorException,
);
});

it('should throw wrapped error when API call fails', async () => {
mockedAxios.get.mockRejectedValueOnce(mockRdiUnauthorizedError);

await expect(client.selectPipeline()).rejects.toThrow(
mockRdiUnauthorizedError.message,
);
expect(mockedAxios.get).toHaveBeenCalledWith(RdiUrlV2.GetPipelines);
});

it('should select first pipeline even when multiple pipelines exist', async () => {
const mockPipelinesResponse = [
{
name: 'first',
active: false,
config: {},
status: 'stopped',
errors: [],
components: [],
current: false,
},
{
name: 'second',
active: true,
config: {},
status: 'running',
errors: [],
components: [],
current: true,
},
{
name: 'third',
active: false,
config: {},
status: 'stopped',
errors: [],
components: [],
current: false,
},
];
mockedAxios.get.mockResolvedValueOnce({ data: mockPipelinesResponse });

await client.selectPipeline();

expect(client['selectedPipeline']).toBe('first');
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import { plainToInstance } from 'class-transformer';
import { Logger } from '@nestjs/common';
import { RdiUrlV2 } from 'src/modules/rdi/constants';
import {
RdiPipelineInternalServerErrorException,
wrapRdiPipelineError,
} from 'src/modules/rdi/exceptions';
import { RdiInfo } from 'src/modules/rdi/models';

import { ApiRdiClient } from 'src/modules/rdi/client/api/v1/api.rdi.client';
import {
GetInfoResponse,
GetPipelinesResponse,
} from 'src/modules/rdi/client/api/v2/responses';

export class ApiV2RdiClient extends ApiRdiClient {
protected readonly logger = new Logger('ApiV2RdiClient');

protected selectedPipeline = 'default';

/**
* Retrieves comprehensive information about the RDI (Redis Data Integration) instance.
*
* This method is available starting from RDI API v2 and provides detailed metadata
* about the RDI instance including version, status, and configuration details.
*
* @returns {Promise<RdiInfo>} A promise that resolves to an RdiInfo object containing
* instance metadata such as version, status, and capabilities
*
* @example
* const info = await client.getInfo();
* console.log(info.version); // e.g., "1.2.0"
*/
async getInfo(): Promise<RdiInfo> {
try {
const { data } = await this.client.get<GetInfoResponse>(RdiUrlV2.GetInfo);

return plainToInstance(RdiInfo, data);
} catch (e) {
throw wrapRdiPipelineError(e);
}
}

/**
* Selects the active pipeline for subsequent RDI operations.
*
* This method fetches all available pipelines from the RDI instance and automatically
* selects the first pipeline in the list. The selected pipeline is stored in the
* `selectedPipeline` property and will be used for all pipeline-specific operations.
*
* In RDI v2, multiple pipelines can exist, but this implementation currently defaults
* to selecting the first available pipeline. If no pipelines exist, an error is thrown.
*
* @returns {Promise<void>} A promise that resolves when the pipeline is successfully selected
*
* @example
* await client.selectPipeline();
* // client.selectedPipeline is now set to the first available pipeline name
*/
async selectPipeline(): Promise<void> {
try {
const { data } = await this.client.get<GetPipelinesResponse>(
RdiUrlV2.GetPipelines,
);

// todo: handle cases when no pipelines differently
if (!data?.length) {
throw new RdiPipelineInternalServerErrorException(
'Unable to select pipeline',
);
}

this.selectedPipeline = data[0].name;
} catch (e) {
throw wrapRdiPipelineError(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './info.responses';
export * from './pipeline.responses';
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export interface GetInfoResponse {
version: string;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
export interface PipelineResponses {
name: string;
active: boolean;
config: any; // todo: define
status: string; // todo: define enum
errors: any[]; // todo: define
components: any[]; // todo: define
current: true;
}

export type GetPipelinesResponse = PipelineResponses[];
5 changes: 5 additions & 0 deletions redisinsight/api/src/modules/rdi/constants/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ export enum RdiUrl {
Action = 'api/v1/actions',
}

export const RdiUrlV2 = {
GetInfo: 'api/v2/info',
GetPipelines: 'api/v2/pipelines',
};

export const IDLE_THRESHOLD = 10 * 60 * 1000; // 10 min
export const RDI_TIMEOUT = 30_000; // 30 sec
export const TOKEN_THRESHOLD = 2 * 60 * 1000; // 2 min
Expand Down
3 changes: 3 additions & 0 deletions redisinsight/api/src/modules/rdi/exceptions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@ export * from './rdi-pipeline.internal-server-error.exception';
export * from './rdi-pipeline.not-found.exception';
export * from './rdi-pipeline.unauthorized.exception';
export * from './rdi-pipeline.validation.exception';
export * from './rdi-reset-pipeline-failed.exception';
export * from './rdi-start-pipeline-failed.exception';
export * from './rdi-stop-pipeline-failed.exception';
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
RdiPipelineValidationException,
} from 'src/modules/rdi/exceptions';
import { RdiPipelineForbiddenException } from './rdi-pipeline.forbidden.exception';
import { RdiPipelineBadRequestException } from "src/modules/rdi/exceptions/rdi-pipeline.bad-request.exception";
import { RdiPipelineBadRequestException } from 'src/modules/rdi/exceptions/rdi-pipeline.bad-request.exception';

export const parseErrorMessage = (error: AxiosError<any>): string => {
const data = error.response?.data;
Expand Down
1 change: 1 addition & 0 deletions redisinsight/api/src/modules/rdi/models/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ export * from './rdi';
export * from './rdi-pipeline';
export * from './rdi-dry-run';
export * from './rdi-statistics';
export * from './rdi-info';
11 changes: 11 additions & 0 deletions redisinsight/api/src/modules/rdi/models/rdi-info.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { ApiProperty } from '@nestjs/swagger';
import { Expose } from 'class-transformer';

export class RdiInfo {
@ApiProperty({
description: 'Current RDI collector version',
type: String,
})
@Expose()
version: string;
}
Loading
Loading