diff --git a/README.md b/README.md index e2e42e5..186ad02 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,10 @@ ## Release Notes +#### v1.1.0 + +- Support for sync from DevRev to external system. Known limitations: no support for loading attachments. + #### v1.0.4 - Fix logging from worker threads. diff --git a/package.json b/package.json index ca81519..0a80187 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@devrev/ts-adaas", - "version": "1.0.4", + "version": "1.1.0", "description": "Typescript library containing the ADaaS(AirDrop as a Service) control protocol.", "type": "commonjs", "main": "./dist/index.js", diff --git a/src/common/constants.ts b/src/common/constants.ts index a03cd72..77bb723 100644 --- a/src/common/constants.ts +++ b/src/common/constants.ts @@ -5,9 +5,11 @@ export const STATELESS_EVENT_TYPES = [ EventType.ExtractionMetadataStart, EventType.ExtractionDataDelete, EventType.ExtractionAttachmentsDelete, + EventType.StartDeletingLoaderState, + EventType.StartDeletingLoaderAttachmentsState, ]; -export const ALLOWED_EVENT_TYPES = [ +export const ALLOWED_EXTRACTION_EVENT_TYPES = [ EventType.ExtractionExternalSyncUnitsStart, EventType.ExtractionMetadataStart, EventType.ExtractionDataStart, @@ -18,6 +20,18 @@ export const ALLOWED_EVENT_TYPES = [ EventType.ExtractionAttachmentsDelete, ]; +export const ALLOWED_LOADING_EVENT_TYPES = [ + EventType.StartLoadingData, + EventType.ContinueLoadingData, + EventType.StartDeletingLoaderState, + EventType.StartDeletingLoaderAttachmentsState, +]; + +export const ALLOWED_EVENT_TYPES = [ + ...ALLOWED_EXTRACTION_EVENT_TYPES, + ...ALLOWED_LOADING_EVENT_TYPES, +]; + export const ARTIFACT_BATCH_SIZE = 2000; export const MAX_DEVREV_ARTIFACT_SIZE = 536870912; // 512MB diff --git a/src/common/control-protocol.ts b/src/common/control-protocol.ts index 86c63e8..77ae568 100644 --- a/src/common/control-protocol.ts +++ b/src/common/control-protocol.ts @@ -5,12 +5,14 @@ import { EventData, ExtractorEvent, ExtractorEventType, + LoaderEvent, } from '../types/extraction'; +import { LoaderEventType } from '../types/loading'; import { formatAxiosError } from '../logger/logger'; export interface EmitInterface { event: AirdropEvent; - eventType: ExtractorEventType; + eventType: ExtractorEventType | LoaderEventType; data?: EventData; } @@ -19,7 +21,7 @@ export const emit = async ({ eventType, data, }: EmitInterface): Promise => { - const newEvent: ExtractorEvent = { + const newEvent: ExtractorEvent | LoaderEvent = { event_type: eventType, event_context: { uuid: event.payload.event_context.uuid, diff --git a/src/common/helpers.ts b/src/common/helpers.ts index f4eef86..735b4f3 100644 --- a/src/common/helpers.ts +++ b/src/common/helpers.ts @@ -1,7 +1,19 @@ -import { EventType, ExtractorEventType } from '../types/extraction'; +import { + AirdropEvent, + EventType, + ExtractorEventType, +} from '../types/extraction'; +import { + ActionType, + FileToLoad, + ItemTypeToLoad, + LoaderEventType, + LoaderReport, + StatsFileObject, +} from '../types/loading'; -export function getErrorExtractorEventType(eventType: EventType): { - eventType: ExtractorEventType; +export function getTimeoutErrorEventType(eventType: EventType): { + eventType: ExtractorEventType | LoaderEventType; } | null { switch (eventType) { case EventType.ExtractionMetadataStart: @@ -30,11 +42,104 @@ export function getErrorExtractorEventType(eventType: EventType): { return { eventType: ExtractorEventType.ExtractionExternalSyncUnitsError, }; + + case EventType.StartLoadingData: + case EventType.ContinueLoadingData: + return { + eventType: LoaderEventType.DataLoadingError, + }; + case EventType.StartDeletingLoaderState: + case EventType.StartDeletingLoaderAttachmentsState: + return { + eventType: LoaderEventType.LoaderStateDeletionError, + }; default: console.error( - 'Event type not recognized in getTimeoutExtractorEventType function: ' + + 'Event type not recognized in getTimeoutErrorEventType function: ' + eventType ); return null; } } + +export function getSyncDirection({ event }: { event: AirdropEvent }) { + return event.payload.event_context.mode; +} + +export function getFilesToLoad({ + itemTypesToLoad, + statsFile, +}: { + itemTypesToLoad: ItemTypeToLoad[]; + statsFile: StatsFileObject[]; +}): FileToLoad[] { + const filesToLoad = []; + + if (itemTypesToLoad.length === 0 || statsFile.length === 0) { + return []; + } + + const supportedItemTypes = itemTypesToLoad.map( + (itemTypeToLoad) => itemTypeToLoad.itemType + ); + + const filteredStatsFile = statsFile.filter((file) => + supportedItemTypes.includes(file.item_type) + ); + + const orderedFiles = filteredStatsFile.sort((a, b) => { + const aIndex = supportedItemTypes.indexOf(a.item_type); + const bIndex = supportedItemTypes.indexOf(b.item_type); + + return aIndex - bIndex; + }); + + for (const file of orderedFiles) { + filesToLoad.push({ + id: file.id, + file_name: file.file_name, + itemType: file.item_type, + count: parseInt(file.count), + completed: false, + lineToProcess: 0, + }); + } + + return filesToLoad; +} + +export function addReportToLoaderReport({ + loaderReports, + report, +}: { + loaderReports: LoaderReport[]; + report: LoaderReport; +}): LoaderReport[] { + const existingReport = loaderReports.find( + (loaderReport) => loaderReport.item_type === report.item_type + ); + + if (existingReport) { + existingReport[ActionType.CREATED] = existingReport[ActionType.CREATED] + ? report[ActionType.CREATED] + ? existingReport[ActionType.CREATED] + report[ActionType.CREATED] + : existingReport[ActionType.CREATED] + : report[ActionType.CREATED]; + + existingReport[ActionType.UPDATED] = existingReport[ActionType.UPDATED] + ? report[ActionType.UPDATED] + ? existingReport[ActionType.UPDATED] + report[ActionType.UPDATED] + : existingReport[ActionType.UPDATED] + : report[ActionType.UPDATED]; + + existingReport[ActionType.FAILED] = existingReport[ActionType.FAILED] + ? report[ActionType.FAILED] + ? existingReport[ActionType.FAILED] + report[ActionType.FAILED] + : existingReport[ActionType.FAILED] + : report[ActionType.FAILED]; + } else { + loaderReports.push(report); + } + + return loaderReports; +} diff --git a/src/logger/logger.test.ts b/src/logger/logger.test.ts index 5a58698..f6a8a54 100644 --- a/src/logger/logger.test.ts +++ b/src/logger/logger.test.ts @@ -50,5 +50,8 @@ it('formatAxiosError should return formatted error', () => { status: 500, data: 'Internal server error', method: 'GET', + baseURL: undefined, + url: undefined, + payload: undefined, }); }); diff --git a/src/mappers/mappers.interface.ts b/src/mappers/mappers.interface.ts new file mode 100644 index 0000000..c0761ec --- /dev/null +++ b/src/mappers/mappers.interface.ts @@ -0,0 +1,96 @@ +import { AirdropEvent } from '../types'; +import { DonV2 } from '../types/loading'; +import { WorkerAdapterOptions } from '../types/workers'; + +export interface MappersFactoryInterface { + event: AirdropEvent; + options?: WorkerAdapterOptions; +} + +export interface UpdateSyncMapperRecordParams { + external_ids: { + add: string[]; + }; + secondary_ids?: Record; + targets: { + add: DonV2[]; + }; + status: SyncMapperRecordStatus; + input_files?: { + add: string[]; + }; + external_versions?: { + add: SyncMapperRecordExternalVersion[]; + }; + extra_data?: string; +} + +export interface SyncMapperRecord { + id: DonV2; + external_ids: string[]; + secondary_ids?: Record; + targets: DonV2[]; + status: SyncMapperRecordStatus; + input_files?: string[]; + external_versions?: SyncMapperRecordExternalVersion[]; + extra_data?: string; +} + +export interface MappersGetByTargetIdParams { + sync_unit: DonV2; + target: DonV2; +} + +export interface MappersGetByTargetIdResponse { + sync_mapper_record: SyncMapperRecord; +} + +export interface MappersCreateParams { + sync_unit: DonV2; + external_ids: string[]; + secondary_ids?: Record; + targets: DonV2[]; + status: SyncMapperRecordStatus; + input_files?: string[]; + external_versions?: SyncMapperRecordExternalVersion[]; + extra_data?: string; +} + +export interface MappersCreateResponse { + sync_mapper_record: SyncMapperRecord; +} + +export interface MappersUpdateParams { + id: DonV2; + sync_unit: DonV2; + external_ids: { + add: string[]; + }; + secondary_ids?: Record; + targets: { + add: DonV2[]; + }; + status: SyncMapperRecordStatus; + input_files?: { + add: string[]; + }; + external_versions?: { + add: SyncMapperRecordExternalVersion[]; + }; + extra_data?: string; +} + +export interface MappersUpdateResponse { + sync_mapper_record: SyncMapperRecord; +} + +export enum SyncMapperRecordStatus { + OPERATIONAL = 'operational', + FILTERED = 'filtered', + IGNORED = 'ignored', +} + +export interface SyncMapperRecordExternalVersion { + recipe_version: number; + modified_date: string; +} diff --git a/src/mappers/mappers.ts b/src/mappers/mappers.ts new file mode 100644 index 0000000..7dcda1c --- /dev/null +++ b/src/mappers/mappers.ts @@ -0,0 +1,64 @@ +import axios, { AxiosResponse } from 'axios'; + +import { + MappersFactoryInterface, + MappersCreateParams, + MappersCreateResponse, + MappersGetByTargetIdParams, + MappersGetByTargetIdResponse, + MappersUpdateParams, + MappersUpdateResponse, +} from './mappers.interface'; + +export class Mappers { + private endpoint: string; + private token: string; + + constructor({ event }: MappersFactoryInterface) { + this.endpoint = event.execution_metadata.devrev_endpoint; + this.token = event.context.secrets.service_account_token; + } + + async getByTargetId( + params: MappersGetByTargetIdParams + ): Promise> { + const { sync_unit, target } = params; + return axios.get( + `${this.endpoint}/internal/airdrop.sync-mapper-record.get-by-target`, + { + headers: { + Authorization: this.token, + }, + params: { sync_unit, target }, + } + ); + } + + async create( + params: MappersCreateParams + ): Promise> { + return axios.post( + `${this.endpoint}/internal/airdrop.sync-mapper-record.create`, + params, + { + headers: { + Authorization: this.token, + }, + } + ); + } + + async update( + params: MappersUpdateParams + ): Promise> { + return axios.post( + `${this.endpoint}/internal/airdrop.sync-mapper-record.update`, + params, + { + headers: { + Authorization: this.token, + }, + } + ); + } +} diff --git a/src/state/state.interfaces.ts b/src/state/state.interfaces.ts index 6ee5e9e..263f68d 100644 --- a/src/state/state.interfaces.ts +++ b/src/state/state.interfaces.ts @@ -1,16 +1,20 @@ import { AirdropEvent } from '../types/extraction'; +import { FileToLoad } from '../types/loading'; import { WorkerAdapterOptions } from '../types/workers'; -/** - * AdapterState is an interface that defines the structure of the adapter state that is used by the external extractor. It extends the connector state with additional fields: lastSyncStarted, lastSuccessfulSyncStarted, and attachmentsMetadata. - */ -export type AdapterState = ConnectorState & { +export interface SdkState { lastSyncStarted?: string; lastSuccessfulSyncStarted?: string; toDevRev?: ToDevRev; fromDevRev?: FromDevRev; }; + +/** + * AdapterState is an interface that defines the structure of the adapter state that is used by the external extractor. It extends the connector state with additional fields: lastSyncStarted, lastSuccessfulSyncStarted, and attachmentsMetadata. + */ +export type AdapterState = ConnectorState & SdkState; + export interface ToDevRev { attachmentsMetadata: { artifactIds: string[]; @@ -18,7 +22,9 @@ export interface ToDevRev { }; } -export interface FromDevRev {} +export interface FromDevRev { + filesToLoad: FileToLoad[]; +} export interface StateInterface { event: AirdropEvent; diff --git a/src/state/state.ts b/src/state/state.ts index 823c961..34eee67 100644 --- a/src/state/state.ts +++ b/src/state/state.ts @@ -1,11 +1,12 @@ import axios from 'axios'; -import { AirdropEvent } from '../types/extraction'; +import { AirdropEvent, SyncMode } from '../types/extraction'; import { STATELESS_EVENT_TYPES } from '../common/constants'; import { formatAxiosError, getPrintableState } from '../logger/logger'; import { ErrorRecord } from '../types/common'; -import { AdapterState, StateInterface } from './state.interfaces'; +import { AdapterState, SdkState, StateInterface } from './state.interfaces'; +import { getSyncDirection } from '../common/helpers'; export async function createAdapterState({ event, @@ -29,23 +30,32 @@ export async function createAdapterState({ export class State { private _state: AdapterState; + private initialSdkState: SdkState; private event: AirdropEvent; private workerUrl: string; private devrevToken: string; constructor({ event, initialState }: StateInterface) { + this.initialSdkState = + getSyncDirection({ event }) === SyncMode.LOADING + ? { + fromDevRev: { + filesToLoad: [], + }, + } + : { + lastSyncStarted: new Date().toISOString(), + lastSuccessfulSyncStarted: '', + toDevRev: { + attachmentsMetadata: { + artifactIds: [], + lastProcessed: 0, + }, + }, + }; this._state = { ...initialState, - lastSyncStarted: new Date().toISOString(), - lastSuccessfulSyncStarted: '', - - fromDevRev: { - attachmentsMetadata: { - artifactIds: [], - lastProcessed: 0, - }, - }, - toDevRev: {}, + ...this.initialSdkState, } as AdapterState; this.event = event; @@ -139,15 +149,7 @@ export class State { if (axios.isAxiosError(error) && error.response?.status === 404) { const state: AdapterState = { ...initialState, - lastSyncStarted: new Date().toISOString(), - lastSuccessfulSyncStarted: '', - toDevRev: { - attachmentsMetadata: { - artifactIds: [], - lastProcessed: 0, - }, - }, - fromDevRev: {}, + ...this.initialSdkState, }; this.state = state; diff --git a/src/tests/from_devrev/loading.test.ts b/src/tests/from_devrev/loading.test.ts new file mode 100644 index 0000000..6eecfb3 --- /dev/null +++ b/src/tests/from_devrev/loading.test.ts @@ -0,0 +1,153 @@ +import { getFilesToLoad } from '../../common/helpers'; +import { ItemTypeToLoad, StatsFileObject } from '../../types/loading'; + +describe('getFilesToLoad', () => { + let statsFile: StatsFileObject[]; + + beforeEach(() => { + statsFile = [ + { + id: 'don:core:dvrv-us-1:devo/1:artifact/2', + file_name: 'transformer_issues_X.json.gz', + item_type: 'issues', + count: '79', + }, + { + id: 'don:core:dvrv-us-1:devo/1:artifact/5', + file_name: 'transformer_issues_X.json.gz', + item_type: 'comments', + count: '1079', + }, + { + id: 'don:core:dvrv-us-1:devo/1:artifact/9', + file_name: 'transformer_issues_X.json.gz', + item_type: 'issues', + count: '1921', + }, + { + id: 'don:core:dvrv-us-1:devo/1:artifact/14', + file_name: 'transformer_issues_X.json.gz', + item_type: 'comments', + count: '921', + }, + { + id: 'don:core:dvrv-us-1:devo/1:artifact/99', + file_name: 'transformer_issues_X.json.gz', + item_type: 'attachments', + count: '50', + }, + { + id: 'don:core:dvrv-us-1:devo/1:artifact/99', + file_name: 'transformer_issues_X.json.gz', + item_type: 'unknown', + count: '50', + }, + { + id: 'don:core:dvrv-us-1:devo/1:artifact/99', + file_name: 'transformer_issues_X.json.gz', + item_type: 'issues', + count: '32', + }, + ]; + }); + + it('should return an empty array if statsFile is empty', () => { + statsFile = []; + const itemTypesToLoad: ItemTypeToLoad[] = []; + const result = getFilesToLoad({ itemTypesToLoad, statsFile }); + expect(result).toEqual([]); + }); + + it('should return an empty array if itemTypesToLoad is empty', () => { + const itemTypesToLoad: ItemTypeToLoad[] = []; + const result = getFilesToLoad({ itemTypesToLoad, statsFile }); + expect(result).toEqual([]); + }); + + it('should return an empty array if statsFile has no matching items', () => { + const itemTypesToLoad: ItemTypeToLoad[] = [ + { itemType: 'users', create: jest.fn(), update: jest.fn() }, + ]; + const result = getFilesToLoad({ itemTypesToLoad, statsFile }); + expect(result).toEqual([]); + }); + + it('should filter out files not in itemTypesToLoad and order them by itemTypesToLoad', () => { + const itemTypesToLoad: ItemTypeToLoad[] = [ + { itemType: 'attachments', create: jest.fn(), update: jest.fn() }, + { itemType: 'issues', create: jest.fn(), update: jest.fn() }, + ]; + const result = getFilesToLoad({ itemTypesToLoad, statsFile }); + expect(result).toEqual([ + { + id: 'don:core:dvrv-us-1:devo/1:artifact/99', + itemType: 'attachments', + count: 50, + file_name: 'transformer_issues_X.json.gz', + completed: false, + lineToProcess: 0, + }, + { + id: 'don:core:dvrv-us-1:devo/1:artifact/2', + itemType: 'issues', + count: 79, + file_name: 'transformer_issues_X.json.gz', + completed: false, + lineToProcess: 0, + }, + { + id: 'don:core:dvrv-us-1:devo/1:artifact/9', + itemType: 'issues', + count: 1921, + file_name: 'transformer_issues_X.json.gz', + completed: false, + lineToProcess: 0, + }, + { + id: 'don:core:dvrv-us-1:devo/1:artifact/99', + itemType: 'issues', + count: 32, + file_name: 'transformer_issues_X.json.gz', + completed: false, + lineToProcess: 0, + }, + ]); + }); + + it('should ignore files with unrecognized item types in statsFile', () => { + const itemTypesToLoad: ItemTypeToLoad[] = [ + { itemType: 'issues', create: jest.fn(), update: jest.fn() }, + ]; + const result = getFilesToLoad({ + itemTypesToLoad, + statsFile, + }); + + expect(result).toEqual([ + { + id: 'don:core:dvrv-us-1:devo/1:artifact/2', + itemType: 'issues', + count: 79, + file_name: 'transformer_issues_X.json.gz', + completed: false, + lineToProcess: 0, + }, + { + id: 'don:core:dvrv-us-1:devo/1:artifact/9', + itemType: 'issues', + count: 1921, + file_name: 'transformer_issues_X.json.gz', + completed: false, + lineToProcess: 0, + }, + { + id: 'don:core:dvrv-us-1:devo/1:artifact/99', + itemType: 'issues', + count: 32, + file_name: 'transformer_issues_X.json.gz', + completed: false, + lineToProcess: 0, + }, + ]); + }); +}); diff --git a/src/tests/test-helpers.ts b/src/tests/test-helpers.ts index 69c6eaf..2d33bb9 100644 --- a/src/tests/test-helpers.ts +++ b/src/tests/test-helpers.ts @@ -31,21 +31,27 @@ export function createEvent({ key_type: 'test_key_type', }, event_context: { - mode: 'test_mode', callback_url: 'test_callback_url', + dev_org: 'test_dev_org', dev_org_id: 'test_dev_org_id', + dev_user: 'test_dev_user', dev_user_id: 'test_dev_user_id', + external_sync_unit: 'test_external_sync_unit', external_sync_unit_id: 'test_external_sync_unit_id', - sync_unit_id: 'test_sync_unit_id', - sync_run_id: 'test_sync_run_id', - external_system_id: 'test_external_system_id', - uuid: 'test_uuid', - worker_data_url: 'test_worker_data_url', + external_sync_unit_name: 'test_external_sync_unit_name', external_system: 'test_external_system', external_system_type: 'test_external_system_type', import_slug: 'test_import_slug', + mode: 'test_mode', + request_id: 'test_request_id', snap_in_slug: 'test_snap_in_slug', + sync_run: 'test_sync_run', + sync_run_id: 'test_sync_run_id', sync_tier: 'test_sync_tier', + sync_unit: 'test_sync_unit', + sync_unit_id: 'test_sync_unit_id', + uuid: 'test_uuid', + worker_data_url: 'test_worker_data_url', }, event_type: eventType, event_data: { diff --git a/src/types/extraction.ts b/src/types/extraction.ts index 35aa119..11197cb 100644 --- a/src/types/extraction.ts +++ b/src/types/extraction.ts @@ -4,11 +4,14 @@ import { Artifact } from '../uploader/uploader.interfaces'; import { ErrorRecord } from './common'; +import { DonV2, LoaderReport } from './loading'; + /** * EventType is an enum that defines the different types of events that can be sent to the external extractor from ADaaS. * The external extractor can use these events to know what to do next in the extraction process. */ export enum EventType { + // Extraction ExtractionExternalSyncUnitsStart = 'EXTRACTION_EXTERNAL_SYNC_UNITS_START', ExtractionMetadataStart = 'EXTRACTION_METADATA_START', ExtractionDataStart = 'EXTRACTION_DATA_START', @@ -17,6 +20,12 @@ export enum EventType { ExtractionAttachmentsStart = 'EXTRACTION_ATTACHMENTS_START', ExtractionAttachmentsContinue = 'EXTRACTION_ATTACHMENTS_CONTINUE', ExtractionAttachmentsDelete = 'EXTRACTION_ATTACHMENTS_DELETE', + + // Loading + StartLoadingData = 'START_LOADING_DATA', + ContinueLoadingData = 'CONTINUE_LOADING_DATA', + StartDeletingLoaderState = 'START_DELETING_LOADER_STATE', + StartDeletingLoaderAttachmentsState = 'START_DELETING_LOADER_ATTACHMENTS_STATE', } /** @@ -24,6 +33,7 @@ export enum EventType { * The external extractor can use these events to inform ADaaS about the progress of the extraction process. */ export enum ExtractorEventType { + // Extraction ExtractionExternalSyncUnitsDone = 'EXTRACTION_EXTERNAL_SYNC_UNITS_DONE', ExtractionExternalSyncUnitsError = 'EXTRACTION_EXTERNAL_SYNC_UNITS_ERROR', ExtractionMetadataDone = 'EXTRACTION_METADATA_DONE', @@ -40,9 +50,13 @@ export enum ExtractorEventType { ExtractionAttachmentsError = 'EXTRACTION_ATTACHMENTS_ERROR', ExtractionAttachmentsDeleteDone = 'EXTRACTION_ATTACHMENTS_DELETE_DONE', ExtractionAttachmentsDeleteError = 'EXTRACTION_ATTACHMENTS_DELETE_ERROR', + + // Unknown + UnknownEventType = 'UNKNOWN_EVENT_TYPE', } /** + * @deprecated * ExtractionMode is an enum that defines the different modes of extraction that can be used by the external extractor. * It can be either INITIAL or INCREMENTAL. INITIAL mode is used for the first/initial import, while INCREMENTAL mode is used for doing syncs. */ @@ -51,6 +65,16 @@ export enum ExtractionMode { INCREMENTAL = 'INCREMENTAL', } +/** + * ExtractionMode is an enum that defines the different modes of extraction that can be used by the external extractor. + * It can be either INITIAL or INCREMENTAL. INITIAL mode is used for the first/initial import, while INCREMENTAL mode is used for doing syncs. + */ +export enum SyncMode { + INITIAL = 'INITIAL', + INCREMENTAL = 'INCREMENTAL', + LOADING = 'LOADING', +} + /** * ExternalSyncUnit is an interface that defines the structure of an external sync unit (repos, projects, ...) that can be extracted. * It must contain an ID, a name, and a description. It can also contain the number of items in the external sync unit. @@ -67,21 +91,27 @@ export interface ExternalSyncUnit { * EventContextIn is an interface that defines the structure of the input event context that is sent to the external extractor from ADaaS. */ export interface EventContextIn { - mode: string; callback_url: string; + dev_org: string; dev_org_id: string; + dev_user: string; dev_user_id: string; - external_sync_unit_id?: string; - sync_unit_id?: string; - sync_run_id: string; - external_system_id: string; - uuid: string; - worker_data_url: string; + external_sync_unit: string; + external_sync_unit_id: string; + external_sync_unit_name: string; external_system: string; external_system_type: string; import_slug: string; + mode: string; + request_id: string; snap_in_slug: string; + sync_run: string; + sync_run_id: string; sync_tier: string; + sync_unit: DonV2; + sync_unit_id: string; + uuid: string; + worker_data_url: string; } /** @@ -116,6 +146,11 @@ export interface EventData { * @deprecated This field is deprecated and should not be used. */ artifacts?: Artifact[]; + + // TODO: Probably this should be moved somewhere else and required in case of specific event types + reports?: LoaderReport[]; + processed_files?: string[]; + stats_file?: string; } /** @@ -171,3 +206,12 @@ export interface ExtractorEvent { event_context: EventContextOut; event_data?: EventData; } + +/** + * LoaderEvent + */ +export interface LoaderEvent { + event_type: string; + event_context: EventContextOut; + event_data?: EventData; +} diff --git a/src/types/index.ts b/src/types/index.ts index 32dd478..325696e 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -21,8 +21,17 @@ export { AirdropEvent, AirdropMessage, ExtractorEvent, + SyncMode, } from './extraction'; +// Loading +export { + LoaderEventType, + ExternalSystemItem, + ExternalSystemItemLoadingResponse, + ExternalSystemItemLoadingParams, +} from './loading'; + // Repo export { NormalizedItem, diff --git a/src/types/loading.ts b/src/types/loading.ts new file mode 100644 index 0000000..f84824a --- /dev/null +++ b/src/types/loading.ts @@ -0,0 +1,118 @@ +import { AirdropEvent } from './extraction'; +import { Mappers } from '../mappers/mappers'; +import { ErrorRecord } from './common'; + +export interface StatsFileObject { + id: string; + item_type: string; + file_name: string; + count: string; +} + +export interface FileToLoad { + id: string; + file_name: string; + itemType: string; + count: number; + lineToProcess: number; + completed: boolean; +} + +export interface ExternalSystemItem { + id: { + devrev: DonV2; + external?: string; + }; + created_date: string; + modified_date: string; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + data: any; +} + +export interface ExternalSystemItemLoadingParams { + item: ExternalSystemItem; + mappers: Mappers; + event: AirdropEvent; +} + +export interface ExternalSystemItemLoadingResponse { + id?: string; + error?: string; + modifiedDate?: string; + delay?: number; +} + +export interface ExternalSystemItemLoadedItem { + id?: string; + error?: string; + modifiedDate?: string; +} + +export type ExternalSystemLoadingFunction = ({ + item, + mappers, + event, +}: ExternalSystemItemLoadingParams) => Promise; + +export interface ItemTypeToLoad { + itemType: string; + create: ExternalSystemLoadingFunction; + update: ExternalSystemLoadingFunction; + // requiresSecondPass: boolean; +} + +export interface ItemTypesToLoadParams { + itemTypesToLoad: ItemTypeToLoad[]; +} + +export interface LoaderReport { + item_type: string; + [ActionType.CREATED]?: number; + [ActionType.UPDATED]?: number; + [ActionType.SKIPPED]?: number; + [ActionType.DELETED]?: number; + [ActionType.FAILED]?: number; +} + +export interface RateLimited { + delay: number; +} + +export interface LoadItemResponse { + error?: ErrorRecord; + report?: LoaderReport; + rateLimit?: RateLimited; +} + +export interface LoadItemTypesResponse { + reports: LoaderReport[]; + processed_files: string[]; +} + +export enum ActionType { + CREATED = 'created', + UPDATED = 'updated', + SKIPPED = 'skipped', + DELETED = 'deleted', + FAILED = 'failed', +} + +export type DonV2 = string; + +export type SyncMapperRecord = { + external_ids: string[]; + secondary_ids: string[]; + devrev_ids: string[]; + status: string[]; + input_file?: string; +}; + +export enum LoaderEventType { + DataLoadingProgress = 'DATA_LOADING_PROGRESS', + DataLoadingDelay = 'DATA_LOADING_DELAYED', + DataLoadingDone = 'DATA_LOADING_DONE', + DataLoadingError = 'DATA_LOADING_ERROR', + LoaderStateDeletionDone = 'LOADER_STATE_DELETION_DONE', + LoaderStateDeletionError = 'LOADER_STATE_DELETION_ERROR', + UnknownEventType = 'UNKNOWN_EVENT_TYPE', +} diff --git a/src/types/workers.ts b/src/types/workers.ts index 7ad0f20..c4785dc 100644 --- a/src/types/workers.ts +++ b/src/types/workers.ts @@ -6,6 +6,8 @@ import { WorkerAdapter } from '../workers/worker-adapter'; import { ExtractorEventType, AirdropEvent } from './extraction'; +import { LoaderEventType } from './loading'; + /** * WorkerAdapterInterface is an interface for WorkerAdapter class. * @interface WorkerAdapterInterface @@ -117,7 +119,7 @@ export enum WorkerMessageSubject { export interface WorkerMessageEmitted { subject: WorkerMessageSubject.WorkerMessageEmitted; payload: { - eventType: ExtractorEventType; + eventType: ExtractorEventType | LoaderEventType; }; } diff --git a/src/uploader/uploader.interfaces.ts b/src/uploader/uploader.interfaces.ts index 2dd8d1d..498cf1b 100644 --- a/src/uploader/uploader.interfaces.ts +++ b/src/uploader/uploader.interfaces.ts @@ -1,6 +1,7 @@ import { AirdropEvent } from '../types/extraction'; import { WorkerAdapterOptions } from '../types/workers'; import { ErrorRecord } from '../types/common'; +import { ExternalSystemItem, StatsFileObject } from '../types/loading'; export interface UploaderFactoryInterface { event: AirdropEvent; @@ -68,3 +69,13 @@ export interface SsorAttachment { external: string; }; } + +export interface StatsFileResponse { + error?: ErrorRecord; + statsFile?: StatsFileObject[]; +} + +export interface TransformerFileResponse { + error?: ErrorRecord; + transformerFile?: ExternalSystemItem[]; +} diff --git a/src/uploader/uploader.ts b/src/uploader/uploader.ts index accd753..a297aea 100644 --- a/src/uploader/uploader.ts +++ b/src/uploader/uploader.ts @@ -393,6 +393,37 @@ export class Uploader { } } + async getJsonObjectByArtifactId({ + artifactId, + isGzipped = false, + }: { + artifactId: string; + isGzipped?: boolean; + }): Promise { + const artifactUrl = await this.getArtifactDownloadUrl(artifactId); + if (!artifactUrl) { + return; + } + + const artifact = await this.downloadArtifact(artifactUrl); + if (!artifact) { + return; + } + + if (isGzipped) { + const decompressedArtifact = await this.decompressGzip(artifact); + if (!decompressedArtifact) { + return; + } + + const jsonlObject = Buffer.from(decompressedArtifact).toString('utf-8'); + return jsonl.parse(jsonlObject); + } + + const jsonlObject = Buffer.from(artifact).toString('utf-8'); + return jsonl.parse(jsonlObject); + } + private async downloadToLocal( itemType: string, fetchedObjects: object | object[] diff --git a/src/workers/create-worker.ts b/src/workers/create-worker.ts index 3a64516..9e7a202 100644 --- a/src/workers/create-worker.ts +++ b/src/workers/create-worker.ts @@ -18,8 +18,8 @@ async function createWorker( workerData, } as WorkerOptions); - worker.on(WorkerEvent.WorkerError, () => { - logger.error('Worker error'); + worker.on(WorkerEvent.WorkerError, (error) => { + logger.error('Worker error', error); reject(); }); worker.on(WorkerEvent.WorkerOnline, () => { diff --git a/src/workers/default-workers/data-loading.ts b/src/workers/default-workers/data-loading.ts new file mode 100644 index 0000000..e3a1762 --- /dev/null +++ b/src/workers/default-workers/data-loading.ts @@ -0,0 +1,18 @@ +import { processTask } from 'workers/process-task'; +import { LoaderEventType } from '../../types/loading'; + +processTask({ + task: async ({ adapter }) => { + await adapter.emit(LoaderEventType.DataLoadingDone, { + reports: adapter.reports, + processed_files: adapter.processedFiles, + }); + }, + onTimeout: async ({ adapter }) => { + await adapter.postState(); + await adapter.emit(LoaderEventType.DataLoadingProgress, { + reports: adapter.reports, + processed_files: adapter.processedFiles, + }); + }, +}); diff --git a/src/workers/spawn.ts b/src/workers/spawn.ts index 987a539..3eda5cb 100644 --- a/src/workers/spawn.ts +++ b/src/workers/spawn.ts @@ -1,8 +1,12 @@ import { Worker } from 'node:worker_threads'; -import { AirdropEvent, EventType } from '../types/extraction'; +import { + AirdropEvent, + EventType, + ExtractorEventType, +} from '../types/extraction'; import { emit } from '../common/control-protocol'; -import { getErrorExtractorEventType } from '../common/helpers'; +import { getTimeoutErrorEventType } from '../common/helpers'; import { Logger } from '../logger/logger'; import { ALLOWED_EVENT_TYPES } from '../common/constants'; import { @@ -20,16 +24,14 @@ import { LogLevel } from '../logger/logger.interfaces'; function getWorkerPath({ event, connectorWorkerPath, - options, }: GetWorkerPathInterface): string | null { - const logger = new Logger({ event, options }); - if (!ALLOWED_EVENT_TYPES.includes(event.payload.event_type)) { return null; } if (connectorWorkerPath) return connectorWorkerPath; let path = null; switch (event.payload.event_type) { + // Extraction case EventType.ExtractionExternalSyncUnitsStart: path = __dirname + '/default-workers/external-sync-units-extraction'; break; @@ -45,18 +47,31 @@ function getWorkerPath({ path = __dirname + '/default-workers/attachments-extraction'; break; case EventType.ExtractionDataDelete: - path = __dirname + '/default-workers/data-deletion.js'; + path = __dirname + '/default-workers/data-deletion'; break; case EventType.ExtractionAttachmentsDelete: path = __dirname + '/default-workers/attachments-deletion'; break; + + // Loading + case EventType.StartLoadingData: + case EventType.ContinueLoadingData: + path = __dirname + '/default-workers/data-loading'; + break; + default: - logger.error( - 'Worker script not found for event type: ' + - event.payload.event_type + - '.' - ); - path = null; + emit({ + event, + eventType: ExtractorEventType.UnknownEventType, + data: { + error: { + message: + 'Unrecognized event type in spawn ' + + event.payload.event_type + + '.', + }, + }, + }); } return path; } @@ -81,6 +96,7 @@ export async function spawn({ boolean | PromiseLike > { const logger = new Logger({ event, options }); + const script = getWorkerPath({ event, connectorWorkerPath: workerPath, @@ -179,7 +195,7 @@ export class Spawn { return; } - const timeoutEventType = getErrorExtractorEventType( + const timeoutEventType = getTimeoutErrorEventType( this.event.payload.event_type ); if (timeoutEventType !== null) { diff --git a/src/workers/worker-adapter.ts b/src/workers/worker-adapter.ts index cb208cc..7c4e900 100644 --- a/src/workers/worker-adapter.ts +++ b/src/workers/worker-adapter.ts @@ -1,12 +1,16 @@ +import axios from 'axios'; import { AirdropEvent, ExtractorEventType, EventData, + EventType, } from '../types/extraction'; +import { ActionType, LoaderEventType, StatsFileObject } from '../types/loading'; import { AdapterState } from '../state/state.interfaces'; import { Artifact } from '../uploader/uploader.interfaces'; import { AIRDROP_DEFAULT_ITEM_TYPES, + ALLOWED_EXTRACTION_EVENT_TYPES, STATELESS_EVENT_TYPES, } from '../common/constants'; import { State } from '../state/state'; @@ -16,6 +20,19 @@ import { emit } from '../common/control-protocol'; import { WorkerMessageEmitted, WorkerMessageSubject } from '../types/workers'; import { Repo } from '../repo/repo'; import { RepoInterface } from '../repo/repo.interfaces'; +import { + ExternalSystemItem, + ItemTypesToLoadParams, + ItemTypeToLoad, + LoaderReport, + LoadItemResponse, + LoadItemTypesResponse, +} from '../types/loading'; +import { addReportToLoaderReport, getFilesToLoad } from '../common/helpers'; +import { Mappers } from '../mappers/mappers'; +import { Uploader } from '../uploader/uploader'; +import { formatAxiosError } from '../logger/logger'; +import { SyncMapperRecordStatus } from '../mappers/mappers.interface'; export function createWorkerAdapter({ event, @@ -56,6 +73,12 @@ export class WorkerAdapter { private isTimeout: boolean; private repos: Repo[] = []; + // Loader + private loaderReports: LoaderReport[]; + private _processedFiles: string[]; + private mappers: Mappers; + private uploader: Uploader; + constructor({ event, adapterState, @@ -69,6 +92,18 @@ export class WorkerAdapter { this.parentPort = parentPort; this.hasWorkerEmitted = false; this.isTimeout = false; + + // Loader + this.loaderReports = []; + this._processedFiles = []; + this.mappers = new Mappers({ + event, + options, + }); + this.uploader = new Uploader({ + event, + options, + }); } get state(): AdapterState { @@ -81,6 +116,14 @@ export class WorkerAdapter { } } + get reports(): LoaderReport[] { + return this.loaderReports; + } + + get processedFiles(): string[] { + return this._processedFiles; + } + initializeRepos(repos: RepoInterface[]) { this.repos = repos.map((repo) => { const shouldNormalize = !Object.values( @@ -138,7 +181,7 @@ export class WorkerAdapter { * @param {EventData=} data - The data to be sent with the event */ async emit( - newEventType: ExtractorEventType, + newEventType: ExtractorEventType | LoaderEventType, data?: EventData ): Promise { if (this.hasWorkerEmitted) { @@ -174,7 +217,14 @@ export class WorkerAdapter { await emit({ eventType: newEventType, event: this.event, - data: { ...data, artifacts: this.artifacts }, + data: { + ...data, + ...(ALLOWED_EXTRACTION_EVENT_TYPES.includes( + this.event.payload.event_type + ) + ? { artifacts: this.artifacts } + : {}), + }, }); const message: WorkerMessageEmitted = { subject: WorkerMessageSubject.WorkerMessageEmitted, @@ -200,4 +250,367 @@ export class WorkerAdapter { handleTimeout() { this.isTimeout = true; } + + async loadItemTypes({ + itemTypesToLoad, + }: ItemTypesToLoadParams): Promise { + const statsFileArtifactId = this.event.payload.event_data?.stats_file; + + if (!statsFileArtifactId) { + console.error('Stats file artifact id not found in event data.'); + await emit({ + event: this.event, + eventType: LoaderEventType.DataLoadingError, + data: { + error: { + message: 'Stats file artifact id not found in event data.', + }, + }, + }); + + return { + reports: this.reports, + processed_files: this.processedFiles, + }; + } + + const statsFile = (await this.uploader.getJsonObjectByArtifactId({ + artifactId: statsFileArtifactId, + })) as StatsFileObject[]; + + console.log('Stats file', statsFile); + + if (!statsFile || statsFile.length === 0) { + console.warn('Stats file not found or empty.'); + return { + reports: this.reports, + processed_files: this.processedFiles, + }; + } + + if (this.event.payload.event_type === EventType.StartLoadingData) { + console.log( + 'Recieved event type ' + + EventType.StartLoadingData + + '. Preparing files to load.' + ); + const filesToLoad = getFilesToLoad({ + itemTypesToLoad, + statsFile, + }); + + if (filesToLoad.length === 0) { + console.warn('No files to load, returning.'); + return { + reports: this.reports, + processed_files: this.processedFiles, + }; + } + + this.adapterState.state.fromDevRev = { filesToLoad }; + } + + console.log( + 'Files to load in state', + this.adapterState.state.fromDevRev?.filesToLoad + ); + + if (!this.adapterState.state.fromDevRev) { + await emit({ + event: this.event, + eventType: LoaderEventType.DataLoadingError, + data: { + error: { + message: 'Unexpected state set in LOAD_DATA.', + }, + }, + }); + return { + reports: this.reports, + processed_files: this.processedFiles, + }; + } + outerloop: for (const fileToLoad of this.adapterState.state.fromDevRev + ?.filesToLoad) { + const itemTypeToLoad = itemTypesToLoad.find( + (itemTypeToLoad: ItemTypeToLoad) => + itemTypeToLoad.itemType === fileToLoad.itemType + ); + + if (!itemTypeToLoad) { + console.error( + `Item type to load not found for item type: ${fileToLoad.itemType}.` + ); + + await emit({ + event: this.event, + eventType: LoaderEventType.DataLoadingError, + data: { + error: { + message: `Item type to load not found for item type: ${fileToLoad.itemType}.`, + }, + }, + }); + + break; + } + + if (!fileToLoad.completed) { + const transformerFile = (await this.uploader.getJsonObjectByArtifactId({ + artifactId: fileToLoad.id, + isGzipped: true, + })) as ExternalSystemItem[]; + + if (!transformerFile) { + console.error('Transformer file not found.'); + break outerloop; + } + + for (let i = fileToLoad.lineToProcess; i < fileToLoad.count; i++) { + const { report, rateLimit, error } = await this.loadItem({ + item: transformerFile[i], + itemTypeToLoad, + }); + + if (error) { + await emit({ + event: this.event, + eventType: LoaderEventType.DataLoadingError, + data: { + error, + }, + }); + + break outerloop; + } + + if (rateLimit?.delay) { + await emit({ + event: this.event, + eventType: LoaderEventType.DataLoadingDelay, + data: { + delay: rateLimit.delay, + reports: this.reports, + processed_files: this.processedFiles, + }, + }); + + break outerloop; + } + + if (report) { + addReportToLoaderReport({ + loaderReports: this.loaderReports, + report, + }); + fileToLoad.lineToProcess = fileToLoad.lineToProcess + 1; + } + } + + fileToLoad.completed = true; + this._processedFiles.push(fileToLoad.id); + } + } + + return { + reports: this.reports, + processed_files: this.processedFiles, + }; + } + + async loadItem({ + item, + itemTypeToLoad, + }: { + item: ExternalSystemItem; + itemTypeToLoad: ItemTypeToLoad; + }): Promise { + const devrevId = item.id.devrev; + + try { + const syncMapperRecordResponse = await this.mappers.getByTargetId({ + sync_unit: this.event.payload.event_context.sync_unit, + target: devrevId, + }); + + const syncMapperRecord = syncMapperRecordResponse.data; + if (!syncMapperRecord) { + console.error('Failed to get sync mapper record from response.'); + return { + error: { + message: 'Failed to get sync mapper record from response.', + }, + }; + } + + // Update item + const { id, modifiedDate, delay, error } = await itemTypeToLoad.update({ + item, + mappers: this.mappers, + event: this.event, + }); + + if (id) { + if (modifiedDate) { + try { + const updateSyncMapperRecordResponse = await this.mappers.update({ + id: syncMapperRecord.sync_mapper_record.id, + sync_unit: this.event.payload.event_context.sync_unit, + status: SyncMapperRecordStatus.OPERATIONAL, + external_versions: { + add: [ + { + modified_date: modifiedDate, + recipe_version: 0, + }, + ], + }, + external_ids: { + add: [id], + }, + targets: { + add: [devrevId], + }, + }); + + console.log( + 'Updated sync mapper record', + JSON.stringify(updateSyncMapperRecordResponse.data) + ); + } catch (error) { + if (axios.isAxiosError(error)) { + console.error( + 'Failed to update sync mapper record', + formatAxiosError(error) + ); + return { + error: { + message: error.message, + }, + }; + } else { + console.error('Failed to update sync mapper record', error); + return { + error: { + message: 'Failed to update sync mapper record' + error, + }, + }; + } + } + } + + return { + report: { + item_type: itemTypeToLoad.itemType, + [ActionType.UPDATED]: 1, + }, + }; + } else if (delay) { + console.log('Rate limited, delaying for', delay); + + return { + rateLimit: { + delay, + }, + }; + } else { + console.error('Failed to update item', error); + return { + report: { + item_type: itemTypeToLoad.itemType, + [ActionType.FAILED]: 1, + }, + }; + } + + // Update mapper (optional) + } catch (error) { + if (axios.isAxiosError(error)) { + if (error.response?.status === 404) { + // Create item + const { id, delay, error } = await itemTypeToLoad.create({ + item, + mappers: this.mappers, + event: this.event, + }); + + if (id) { + // Create mapper + try { + const createdSyncMapperRecordResponse = await this.mappers.create( + { + sync_unit: this.event.payload.event_context.sync_unit, + status: SyncMapperRecordStatus.OPERATIONAL, + external_ids: [id], + targets: [devrevId], + } + ); + + console.log( + 'Created sync mapper record', + createdSyncMapperRecordResponse.data.sync_mapper_record.id + ); + + return { + report: { + item_type: itemTypeToLoad.itemType, + [ActionType.CREATED]: 1, + }, + }; + } catch (error) { + if (axios.isAxiosError(error)) { + console.error( + 'Failed to create sync mapper record', + formatAxiosError(error) + ); + return { + error: { + message: error.message, + }, + }; + } + + console.error('Failed to create sync mapper record', error); + return { + error: { + message: 'Failed to create sync mapper record' + error, + }, + }; + } + } else if (delay) { + return { + rateLimit: { + delay, + }, + }; + } else { + console.error('Failed to create item', error); + return { + report: { + item_type: itemTypeToLoad.itemType, + [ActionType.FAILED]: 1, + }, + }; + } + } else { + console.error( + 'Failed to get sync mapper record', + formatAxiosError(error) + ); + return { + error: { + message: error.message, + }, + }; + } + } + + console.error('Failed to get sync mapper record', error); + return { + error: { + message: 'Failed to get sync mapper record' + error, + }, + }; + } + } }