diff --git a/README.md b/README.md index 7bc9977..7a2e7eb 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,21 @@ Typescript ADaaS Library (@devrev/ts-adaas) provides: - an adapter for ADaaS control protocol, - helpers for uploading artifacts and manage the state for ADaaS snap-in. +## Release Notes + +#### v0.0.2 + +- Support for the State API +- HTTP client for API requests +- Local development environment creates local artifact files +- Improvements in logging + +#### v0.0.1 + +- Demo implementation of ADaaS snap-in +- Adapter for ADaaS control protocol with helper functions +- Uploader for uploading artifacts + ## Usage Create a new ADaaS adapter on each ADaaS snap-in invocation: diff --git a/package-lock.json b/package-lock.json index 806d186..316edf3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@devrev/ts-adaas", - "version": "0.0.1", + "version": "0.0.2", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@devrev/ts-adaas", - "version": "0.0.1", + "version": "0.0.2", "license": "ISC", "dependencies": { "@devrev/typescript-sdk": "^1.1.27", diff --git a/package.json b/package.json index 74de9cf..bdf80c7 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@devrev/ts-adaas", - "version": "0.0.1", + "version": "0.0.2", "description": "Typescript library containing the ADaaS(AirDrop as a Service) control protocol.", "type": "commonjs", "main": "./dist/src/index.js", diff --git a/src/adapter/index.test.ts b/src/adapter/index.test.ts deleted file mode 100644 index c346a0a..0000000 --- a/src/adapter/index.test.ts +++ /dev/null @@ -1,133 +0,0 @@ -import { Context, ExecutionMetadata } from '@devrev/typescript-sdk/dist/snap-ins'; -import { AirdropEvent, EventType } from '../types'; -import { Adapter } from './index'; - -jest.useFakeTimers(); - -const defaultEvent: AirdropEvent = { - context: { - secrets: { - service_account_token: 'mockToken', - }, - } as unknown as Context, - payload: { - connection_data: { - org_id: 'mockOrgId', - org_name: 'mockOrgName', - key: 'mockKey', - key_type: 'mockKeyType', - }, - event_context: { - mode: 'INITIAL', - uuid: 'mockUuid', - callback_url: 'mockCallbackUrl', - dev_org_id: 'DEV-TESTORG', - dev_user_id: 'DEV-TESTUSER', - external_system_id: 'TESTSYSTEM', - sync_run_id: 'mockSyncRunId', - }, - event_type: EventType.ExtractionExternalSyncUnitsStart, - }, - execution_metadata: { - devrev_endpoint: 'http://api.dev.devrev-eng.ai', - } as ExecutionMetadata, - input_data: { - global_values:{}, - event_sources: {}, - }, -}; - -describe('Adapter', () => { - let adapter: Adapter; - - beforeEach(() => { - // Initialize the Adapter instance with mock data - adapter = new Adapter(defaultEvent); - }); - - afterEach(async () => { - // Clear all timers - jest.clearAllTimers(); - - // Clear all instances - jest.clearAllMocks(); - }); - - it('should add artifact to the list of artifacts', async () => { - const artifact = { - id: 'mockId', - item_type: 'mockItemType', - item_count: 1, - // Mock artifact data - }; - - adapter.update({ artifact: artifact }); - - const artifacts = adapter.getArtifacts(); - expect(artifacts).toContain(artifact); - - // Run all timers - jest.runAllTimers(); - }); - - it('should set extractor state if provided', async () => { - const state = { - testing: false, - }; - - adapter = new Adapter( - { - ...defaultEvent, - }, - state - ); - - expect(adapter['extractorState']).toEqual(state); - - const newState = { - testing: true, - }; - - adapter.update({ extractor_state: newState }); - expect(adapter['extractorState']).toEqual(newState); - - // Run all timers - jest.runAllTimers(); - }); - - it('update both an artifact and extractor state', async () => { - const artifact = { - id: 'mockId', - item_type: 'mockItemType', - item_count: 1, - }; - - const state = { - willUpdateToTrue: false, - willAdd4: 0, - }; - - adapter.update({ artifact: artifact, extractor_state: state }); - - const artifacts = adapter.getArtifacts(); - expect(artifacts).toContain(artifact); - - expect(adapter['extractorState']).toEqual(state); - - const newState = { - willUpdateToTrue: true, - willAdd4: 4, - newField: 'newField', - }; - - const newArtifact = { - id: 'newMockId', - item_type: 'newMockItemType', - item_count: 2, - }; - - adapter.update({ artifact: newArtifact, extractor_state: newState }); - // Run all timers - jest.runAllTimers(); - }); -}); diff --git a/src/adapter/index.ts b/src/adapter/index.ts index 35596d6..dcc23c5 100644 --- a/src/adapter/index.ts +++ b/src/adapter/index.ts @@ -1,86 +1,116 @@ import axios from 'axios'; import { - Artifact, AirdropEvent, ExtractorEventType, ExtractorEvent, EventData, + AdapterState, + Artifact, } from '../types'; -import { AdapterUpdateParams } from '../types/common'; -import { getTimeoutExtractorEventType } from '../adapter/helpers'; +import { STATELESS_EVENT_TYPES } from '../common/constants'; +import { getTimeoutExtractorEventType } from '../common/helpers'; +import { Logger } from '../logging'; +import { State, createAdapterState } from '../state'; /** * Adapter class is used to interact with Airdrop platform. The class provides * utilities to * - emit control events to the platform * - update the state of the extractor - * - add artifacts to the list of artifacts to be returned to the platform - * - Return the last saved state and artifacts in case timeout - * - The event sent in case of timeout for each event type is as follows: - * - EXTRACTION_EXTERNAL_SYNC_UNITS_START => EXTRACTION_EXTERNAL_SYNC_UNITS_ERROR - * - EXTRACTION_METADATA_START => EXTRACTION_METADATA_ERROR - * - EXTRACTION_DATA_START => EXTRACTION_DATA_PROGRESS - * - EXTRACTION_DATA_CONTINUE => EXTRACTION_DATA_PROGRESS - * - EXTRACTION_ATTACHMENTS_START => EXTRACTION_ATTACHMENTS_PROGRESS - * - EXTRACTION_ATTACHMENTS_CONTINUE => EXTRACTION_ATTACHMENTS_PROGRESS + * - set the last saved state in case of a timeout * * @class Adapter * @constructor * @param {AirdropEvent} event - The event object received from the platform - * @param {object=} state - Optional state object to be passed to the extractor. If not provided, an empty object is used. + * @param {object=} initialState - The initial state of the adapter + * @param {boolean=} isLocalDevelopment - A flag to indicate if the adapter is being used in local development + */ + +/** + * Creates an adapter instance. + * + * @param {AirdropEvent} event - The event object received from the platform + * @param initialState + * @param {boolean=} isLocalDevelopment - A flag to indicate if the adapter is being used in local development + * @return The adapter instance */ -export class Adapter { - private artifacts: Artifact[]; - /** Adapter level state to return to the platform */ - private extractorState: object; + +export async function createAdapter( + event: AirdropEvent, + initialState: ExtractorState, + isLocalDevelopment: boolean = false +) { + const newInitialState = structuredClone(initialState); + const adapterState: State = await createAdapterState( + event, + newInitialState + ); + + const a = new Adapter( + event, + adapterState, + isLocalDevelopment + ); + + return a; +} + +export class Adapter { + private adapterState: State; + private _artifacts: Artifact[]; private event: AirdropEvent; private callbackUrl: string; private devrevToken: string; + private startTime: number; + private heartBeatFn: NodeJS.Timeout; + private exit: boolean = false; + private lambdaTimeout: number = 10 * 60 * 1000; // 10 minutes in milliseconds + private heartBeatInterval: number = 30 * 1000; // 30 seconds in milliseconds - constructor(event: AirdropEvent, state?: object) { - this.event = event; - this.artifacts = []; + constructor( + event: AirdropEvent, + adapterState: State, + isLocalDevelopment: boolean = false + ) { + if (!isLocalDevelopment) { + Logger.init(event); + } - this.extractorState = state || {}; + this.adapterState = adapterState; + this._artifacts = []; + this.event = event; this.callbackUrl = event.payload.event_context.callback_url; - this.devrevToken = event.context.secrets["service_account_token"]; - - // Once lambda is near to timeout, Snap-in needs to submit the information about artifacts - // that have been uploaded and state, so next time it is run, can continue where it has left off - setTimeout( - async () => { - const extractorEventType = getTimeoutExtractorEventType( - this.event.payload.event_type - ); - - if (extractorEventType) { - await this.emit(extractorEventType, { artifacts: this.artifacts }); - } - }, - 12 * 60 * 1000 - ); + this.devrevToken = event.context.secrets.service_account_token; + + this.startTime = Date.now(); + + // Run heartbeat every 30 seconds + this.heartBeatFn = setInterval(async () => { + const b = await this.heartbeat(); + if (b) { + this.exitAdapter(); + } + }, this.heartBeatInterval); } - /** - * Adds artifact to the list of artifacts. - * Overrides extractor state if provided. - * - * @param {AdapterUpdateParams} params - The parameters to update the adapter - * @param {Artifact=} params.artifact - The artifact to be added to the list of artifacts - * @param {object=} params.extractor_state - The state object to be updated - */ - async update(params: AdapterUpdateParams) { - if (params.artifact) { - this.artifacts.push(params.artifact); - } + get state(): AdapterState { + return this.adapterState.state; + } - if (params.extractor_state) { - this.extractorState = params.extractor_state; - } + set state(value: AdapterState) { + this.adapterState.state = value; + } + + get artifacts(): Artifact[] { + return this._artifacts; + } + + set artifacts(value: Artifact[]) { + this._artifacts = value; } /** @@ -90,26 +120,33 @@ export class Adapter { * @param {EventData=} data - The data to be sent with the event */ async emit(newEventType: ExtractorEventType, data?: EventData) { + if (this.exit) { + console.warn( + 'Adapter is already in exit state. No more events can be emitted.' + ); + return; + } + + // We want to save the state every time we emit an event, except for the start and delete events + if (!STATELESS_EVENT_TYPES.includes(this.event.payload.event_type)) { + console.log(`Saving state before emitting event`); + await this.adapterState.postState(this.state); + } + const newEvent: ExtractorEvent = { - extractor_state: JSON.stringify(this.extractorState), event_type: newEventType, event_context: { uuid: this.event.payload.event_context.uuid, sync_run: this.event.payload.event_context.sync_run_id, - }, + ...(this.event.payload.event_context.sync_unit_id && { + sync_unit: this.event.payload.event_context.sync_unit_id, + }), + }, event_data: { ...data, - artifacts: this.artifacts, }, }; - // If sync_unit_id is present in the event, add it to the new event - if (this.event.payload.event_context.sync_unit_id) { - newEvent.event_context.sync_unit = - this.event.payload.event_context.sync_unit_id; - } - console.log('Event that will be emitted: ' + JSON.stringify(newEvent)); - try { await axios.post( this.callbackUrl, @@ -122,24 +159,50 @@ export class Adapter { }, } ); + + console.log('Successfully emitted event: ' + JSON.stringify(newEvent)); } catch (error) { // If this request fails the extraction will be stuck in loop and // we need to stop it through UI or think about retrying this request console.log( - 'Emitting failed for this event: ' + + 'Failed to emit event: ' + JSON.stringify(newEvent) + ', error: ' + error ); + } finally { + this.exitAdapter(); } } /** - * Returns the list of artifacts stored in the adapter. - * - * @return The list of artifacts + * Exit the adapter. This will stop the heartbeat and no + * further events will be emitted. */ - getArtifacts(): Artifact[] { - return this.artifacts; + private exitAdapter() { + this.exit = true; + clearInterval(this.heartBeatFn); + } + + /** + * Heartbeat function to check if the lambda is about to timeout. + * @returns true if 10 minutes have passed since the start of the lambda. + */ + private async heartbeat(): Promise { + if (this.exit) { + return true; + } + if (Date.now() - this.startTime > this.lambdaTimeout) { + const timeoutEventType = getTimeoutExtractorEventType( + this.event.payload.event_type + ); + if (timeoutEventType !== null) { + const { eventType, isError } = timeoutEventType; + const err = isError ? { message: 'Lambda Timeout' } : undefined; + await this.emit(eventType, { error: err, artifacts: this._artifacts }); + return true; + } + } + return false; } } diff --git a/src/common/constants.ts b/src/common/constants.ts new file mode 100644 index 0000000..116b7ce --- /dev/null +++ b/src/common/constants.ts @@ -0,0 +1,8 @@ +import { EventType } from '../types'; + +export const STATELESS_EVENT_TYPES = [ + EventType.ExtractionExternalSyncUnitsStart, + EventType.ExtractionMetadataStart, + EventType.ExtractionDataDelete, + EventType.ExtractionAttachmentsDelete, +]; diff --git a/src/adapter/helpers.ts b/src/common/helpers.ts similarity index 69% rename from src/adapter/helpers.ts rename to src/common/helpers.ts index a80f309..b6bf28c 100644 --- a/src/adapter/helpers.ts +++ b/src/common/helpers.ts @@ -33,20 +33,33 @@ export function createArtifact( } as Artifact; } -export function getTimeoutExtractorEventType( - eventType: EventType -): ExtractorEventType | null { +export function getTimeoutExtractorEventType(eventType: EventType): { + eventType: ExtractorEventType; + isError: boolean; +} | null { switch (eventType) { case EventType.ExtractionMetadataStart: - return ExtractorEventType.ExtractionMetadataError; + return { + eventType: ExtractorEventType.ExtractionMetadataError, + isError: true, + }; case EventType.ExtractionDataStart: case EventType.ExtractionDataContinue: - return ExtractorEventType.ExtractionDataProgress; + return { + eventType: ExtractorEventType.ExtractionDataProgress, + isError: false, + }; case EventType.ExtractionAttachmentsStart: case EventType.ExtractionAttachmentsContinue: - return ExtractorEventType.ExtractionAttachmentsProgress; + return { + eventType: ExtractorEventType.ExtractionAttachmentsProgress, + isError: false, + }; case EventType.ExtractionExternalSyncUnitsStart: - return ExtractorEventType.ExtractionExternalSyncUnitsError; + return { + eventType: ExtractorEventType.ExtractionExternalSyncUnitsError, + isError: true, + }; default: console.log( 'Event type not recognized in getTimeoutExtractorEventType function: ' + diff --git a/src/demo-extractor/index.ts b/src/demo-extractor/index.ts index c34b9c0..2927816 100644 --- a/src/demo-extractor/index.ts +++ b/src/demo-extractor/index.ts @@ -8,34 +8,41 @@ import { import { Adapter } from '../adapter'; import { Uploader } from '../uploader'; -import recipeJson from './recipe.json'; +import extractorInitialDomainMapping from './initial_domain_mapping.json'; -export class DemoExtractor { - async run(event: AirdropEvent) { - console.log( - 'Event in DemoExtractor run function: ' + JSON.stringify(event) - ); +type ExtractorState = object; - const adapter = new Adapter(event); - const uploader = new Uploader( - event.execution_metadata.devrev_endpoint, - event.context.secrets["service_account_token"] +export class DemoExtractor { + private event: AirdropEvent; + private adapter: Adapter; + private uploader: Uploader; + + constructor(event: AirdropEvent, adapter: Adapter) { + this.event = event; + this.adapter = adapter; + this.uploader = new Uploader( + this.event.execution_metadata.devrev_endpoint, + this.event.context.secrets.service_account_token ); + } - switch (event.payload.event_type) { + async run() { + switch (this.event.payload.event_type) { case EventType.ExtractionExternalSyncUnitsStart: { const externalSyncUnits: ExternalSyncUnit[] = [ { id: 'devrev', name: 'devrev', - description: 'Loopback for DevRev', - item_count: 0, + description: 'Demo external sync unit', }, ]; - await adapter.emit(ExtractorEventType.ExtractionExternalSyncUnitsDone, { - external_sync_units: externalSyncUnits, - }); + await this.adapter.emit( + ExtractorEventType.ExtractionExternalSyncUnitsDone, + { + external_sync_units: externalSyncUnits, + } + ); break; } @@ -44,157 +51,189 @@ export class DemoExtractor { const metadata = [ { item: 'contacts', - fields: ['name', 'lastName'], + fields: ['id', 'name', 'lastName'], }, { item: 'users', - fields: ['name', 'lastName'], + fields: ['id', 'name', 'lastName'], }, ]; - const { artifact, error } = await uploader.upload( + const { artifact, error } = await this.uploader.upload( 'loopback_metadata_1.jsonl', 'metadata', metadata ); - if (error) { - await adapter.emit(ExtractorEventType.ExtractionMetadataError, { + if (error || !artifact) { + await this.adapter.emit(ExtractorEventType.ExtractionMetadataError, { error, }); - } else { - await adapter.update({ artifact }); - await adapter.emit(ExtractorEventType.ExtractionMetadataDone); + return; } + const { artifact: recipe, error: recipeError } = + await this.uploader.upload( + 'recipe.json', + 'initial_domain_mapping', + extractorInitialDomainMapping + ); + + if (recipeError || !recipe) { + await this.adapter.emit(ExtractorEventType.ExtractionMetadataError, { + error: recipeError, + }); + return; + } + + await this.adapter.emit(ExtractorEventType.ExtractionMetadataDone, { + progress: 50, + artifacts: [artifact, recipe], + }); + break; } case EventType.ExtractionDataStart: { const contacts = [ { + id: 1, name: 'John', lastName: 'Doe', }, { + id: 2, name: 'Jane', lastName: 'Doe', }, ]; - const { artifact, error } = await uploader.upload( + const { artifact, error } = await this.uploader.upload( 'loopback_contacts_1.json', 'contacts', contacts ); - if (error) { - await adapter.emit(ExtractorEventType.ExtractionDataError, { + if (error || !artifact) { + await this.adapter.emit(ExtractorEventType.ExtractionDataError, { error, }); - } else { - await adapter.update({ artifact }); - await adapter.emit(ExtractorEventType.ExtractionDataProgress, { - progress: 50, - }); + + return; } + await this.adapter.emit(ExtractorEventType.ExtractionDataProgress, { + progress: 50, + artifacts: [artifact], + }); + break; } case EventType.ExtractionDataContinue: { const users = [ { + id: 1, name: 'John', lastName: 'Phd', }, { + id: 2, name: 'Jane', lastName: 'Phd', }, ]; - const { artifact, error } = await uploader.upload( + const { artifact, error } = await this.uploader.upload( 'loopback_users_1.json', 'users', users ); - if (error) { - await adapter.emit(ExtractorEventType.ExtractionDataError, { + if (error || !artifact) { + await this.adapter.emit(ExtractorEventType.ExtractionDataError, { error, }); - } else { - await adapter.update({ artifact }); - - // TODO: Add separated function for uploading recipe.json? - const { artifact: recipe, error } = await uploader.upload( - 'recipe.json', - 'initial_domain_mapping', - recipeJson - ); - await adapter.update({ artifact: recipe }); - - await adapter.emit(ExtractorEventType.ExtractionDataDone, { - progress: 100, - }); + return; } + await this.adapter.emit(ExtractorEventType.ExtractionDataDone, { + progress: 100, + artifacts: [artifact], + }); + break; } case EventType.ExtractionDataDelete: { - await adapter.emit(ExtractorEventType.ExtractionDataDeleteDone); + await this.adapter.emit(ExtractorEventType.ExtractionDataDeleteDone); break; } case EventType.ExtractionAttachmentsStart: { const attachment1 = ['This is attachment1.txt content']; - const { artifact, error } = await uploader.upload( + const { artifact, error } = await this.uploader.upload( 'attachment1.txt', 'attachment', attachment1 ); - if (error) { - await adapter.emit(ExtractorEventType.ExtractionAttachmentsError, { - error, - }); - } else { - await adapter.update({ artifact }); - await adapter.emit(ExtractorEventType.ExtractionAttachmentsProgress); + if (error || !artifact) { + await this.adapter.emit( + ExtractorEventType.ExtractionAttachmentsError, + { + error, + } + ); + return; } + + await this.adapter.emit( + ExtractorEventType.ExtractionAttachmentsProgress, + { + artifacts: [artifact], + } + ); + break; } case EventType.ExtractionAttachmentsContinue: { const attachment2 = ['This is attachment2.txt content']; - const { artifact, error } = await uploader.upload( + const { artifact, error } = await this.uploader.upload( 'attachment2.txt', 'attachment', attachment2 ); - if (error) { - await adapter.emit(ExtractorEventType.ExtractionAttachmentsError, { - error, - }); - } else { - await adapter.update({ artifact }); - await adapter.emit(ExtractorEventType.ExtractionAttachmentsDone); + if (error || !artifact) { + await this.adapter.emit( + ExtractorEventType.ExtractionAttachmentsError, + { + error, + } + ); + return; } + await this.adapter.emit(ExtractorEventType.ExtractionAttachmentsDone, { + artifacts: [artifact], + }); + break; } case EventType.ExtractionAttachmentsDelete: { - await adapter.emit(ExtractorEventType.ExtractionAttachmentsDeleteDone); + await this.adapter.emit( + ExtractorEventType.ExtractionAttachmentsDeleteDone + ); break; } default: { - console.log( - 'Event in DemoExtractor run not recognized: ' + JSON.stringify(event) + console.error( + 'Event in DemoExtractor run not recognized: ' + + JSON.stringify(this.event.payload.event_type) ); } } diff --git a/src/demo-extractor/initial_domain_mapping.json b/src/demo-extractor/initial_domain_mapping.json new file mode 100644 index 0000000..b0d2c82 --- /dev/null +++ b/src/demo-extractor/initial_domain_mapping.json @@ -0,0 +1,107 @@ +{ + "initial_object_mappings": [ + { + "external_type": "contacts", + "possible_targets": { + "rev_user": { + "initial_field_mappings": [ + { + "destination_field": { + "id": "id", + "is_required": true + }, + "selected_resolutions": ["id"], + "resolutions": { + "id": { + "source_field_description": { + "id": "id", + "is_required": true + } + } + }, + "is_finished": true + }, + { + "destination_field": { + "id": "display_name", + "is_required": true + }, + "selected_resolutions": ["name"], + "resolutions": { + "name": { + "source_field_description": { + "id": "name", + "is_required": true + } + } + }, + "is_finished": true + } + ] + } + }, + "default_target": "rev_user", + "allow_item_type_decisions": true + }, + { + "external_type": "users", + "possible_targets": { + "dev_user": { + "initial_field_mappings": [ + { + "destination_field": { + "id": "id", + "is_required": true + }, + "selected_resolutions": ["id"], + "resolutions": { + "id": { + "source_field_description": { + "id": "id", + "is_required": true + } + } + }, + "is_finished": true + }, + { + "destination_field": { + "id": "display_name", + "is_required": true + }, + "selected_resolutions": ["name"], + "resolutions": { + "name": { + "source_field_description": { + "id": "name", + "is_required": true + } + } + }, + "is_finished": true + } + ] + } + }, + "default_target": "dev_user", + "allow_item_type_decisions": true + }, + { + "external_type": "tickets", + "possible_targets": { + "work.ticket": {} + }, + "default_target": "work.ticket", + "allow_item_type_decisions": true + }, + { + "external_type": "conversations", + "possible_targets": { + "comment": {} + }, + "default_target": "comment", + "allow_item_type_decisions": true + } + ], + "external_system_short_name": "Freshdesk" +} diff --git a/src/demo-extractor/recipe.json b/src/demo-extractor/recipe.json deleted file mode 100644 index 231f216..0000000 --- a/src/demo-extractor/recipe.json +++ /dev/null @@ -1,37 +0,0 @@ -{ - "initial_object_mappings": [ - { - "external_type": "contacts", - "possible_targets": { - "rev_user": {} - }, - "default_target": "rev_user", - "allow_item_type_decisions": true - }, - { - "external_type": "users", - "possible_targets": { - "dev_user": {} - }, - "default_target": "dev_user", - "allow_item_type_decisions": true - }, - { - "external_type": "tickets", - "possible_targets": { - "work.ticket": {} - }, - "default_target": "work.ticket", - "allow_item_type_decisions": true - }, - { - "external_type": "conversations", - "possible_targets": { - "comment": {} - }, - "default_target": "comment", - "allow_item_type_decisions": true - } - ], - "external_system_short_name": "source-system" -} diff --git a/src/http/client.ts b/src/http/client.ts index 38f8700..95fc7ab 100644 --- a/src/http/client.ts +++ b/src/http/client.ts @@ -1,6 +1,5 @@ import axios, { InternalAxiosRequestConfig, isAxiosError } from 'axios'; import { - LAMBDA_LIMIT_EXCEEDED, RATE_LIMIT_EXCEEDED, RATE_LIMIT_EXCEEDED_STATUS_CODE, } from './constants'; @@ -16,7 +15,7 @@ export const defaultResponse: HTTPResponse = { success: false, }; -export default class HTTPClient { +export class HTTPClient { private retryAfter = 0; private retryAt = 0; private axiosInstance = axios.create(); diff --git a/src/http/index.ts b/src/http/index.ts index c99ac5c..d860d06 100644 --- a/src/http/index.ts +++ b/src/http/index.ts @@ -1,3 +1,2 @@ export * from './client'; export * from './types'; -export * from './constants'; diff --git a/src/index.ts b/src/index.ts index 23e2dcc..c266947 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,3 +2,4 @@ export * from './adapter'; export * from './demo-extractor'; export * from './uploader'; export * from './types'; +export * from './http'; diff --git a/src/logging/index.ts b/src/logging/index.ts index 7cfc9eb..af633df 100644 --- a/src/logging/index.ts +++ b/src/logging/index.ts @@ -1,8 +1,9 @@ -enum LogLevel { +import { AirdropEvent } from '../types'; + +export enum LogLevel { INFO = 'info', WARN = 'warn', ERROR = 'error', - DEBUG = 'debug', } /** @@ -11,63 +12,37 @@ enum LogLevel { * - INFO * - WARN * - ERROR - * - DEBUG * * The log tags can be set to any key-value pair. */ -class Logger { - private level: LogLevel; - private logTags: Record = {}; - private static instance: Logger; - - // Private constructor to prevent instantiation - private constructor(level: LogLevel, logTags: Record = {}) { - this.level = level; - this.logTags = logTags; - } - - private log(level: LogLevel, message: string): void { - if (this.level === level) { - console.log(`[${level.toUpperCase()}]: ${message}`, this.logTags); - } - } - - // Singleton instance - public static getInstance( - level: LogLevel, - logTags: Record = {} - ): Logger { - if (!Logger.instance) { - Logger.instance = new Logger(level, logTags); - } - return Logger.instance; - } - - // Set log tags destructively. - public setTags(tags: Record): void { - this.logTags = tags; - } - - // Add log tags. If the key already exists, it will be overwritten. - public addTags(tags: Record): void { - this.logTags = { ...this.logTags, ...tags }; - } - - info(message: string): void { - this.log(LogLevel.INFO, message); - } - - warn(message: string): void { - this.log(LogLevel.WARN, message); - } - - error(message: string): void { - this.log(LogLevel.ERROR, message); - } - - debug(message: string): void { - this.log(LogLevel.DEBUG, message); +export class Logger { + public static init(event: AirdropEvent) { + const origLog = console.log; + console.log = (message: string) => { + origLog(`[${LogLevel.INFO.toUpperCase()}]: ${message}`, { + ...event.payload.event_context, + }); + }; + + const origInfo = console.info; + console.info = (message: string) => { + origInfo(`[${LogLevel.INFO.toUpperCase()}]: ${message}`, { + ...event.payload.event_context, + }); + }; + + const origWarn = console.warn; + console.warn = (message: string) => { + origWarn(`[${LogLevel.WARN.toUpperCase()}]: ${message}`, { + ...event.payload.event_context, + }); + }; + + const origError = console.error; + console.error = (message: string) => { + origError(`[${LogLevel.ERROR.toUpperCase()}]: ${message}`, { + ...event.payload.event_context, + }); + }; } } - -export default Logger; diff --git a/src/state/index.ts b/src/state/index.ts new file mode 100644 index 0000000..cb1e1ea --- /dev/null +++ b/src/state/index.ts @@ -0,0 +1,126 @@ +import axios, { isAxiosError } from 'axios'; + +import { AdapterState, AirdropEvent } from '../types'; +import { STATELESS_EVENT_TYPES } from '../common/constants'; + +export async function createAdapterState( + event: AirdropEvent, + initialState: ExtractorState +) { + const newInitialState = structuredClone(initialState); + const as = new State(event, newInitialState); + + if (!STATELESS_EVENT_TYPES.includes(event.payload.event_type)) { + console.log('Fetching state'); + await as.fetchState(newInitialState); + } + + return as; +} + +export class State { + private _state: AdapterState; + + private event: AirdropEvent; + private workerUrl: string; + private devrevToken: string; + + constructor(event: AirdropEvent, initialState: ExtractorState) { + this._state = { + lastSyncStarted: '', + lastSuccessfulSyncStarted: '', + ...initialState, + }; + + this.event = event; + this.workerUrl = event.payload.event_context.worker_data_url; + this.devrevToken = event.context.secrets.service_account_token; + } + + get state() { + return this._state; + } + + set state(value) { + this._state = value; + } + + /** + * Updates the state of the adapter. + * + * @param {object} state - The state to be updated + */ + async postState(state: AdapterState) { + try { + await axios.post( + this.workerUrl + '.update', + { + state: JSON.stringify(state), + }, + { + headers: { + Authorization: this.devrevToken, + }, + params: { + sync_unit: this.event.payload.event_context.sync_unit_id, + }, + } + ); + + this.state = state; + console.log('State updated successfully'); + } catch (error) { + console.error('Failed to update state, error:' + error); + } + } + + /** + * Fetches the state of the adapter. + * + * @return The state of the adapter + */ + async fetchState( + initialState: ExtractorState + ): Promise | unknown> { + const state: AdapterState = { + ...initialState, + lastSyncStarted: '', + lastSuccessfulSyncStarted: '', + }; + + console.log( + 'Fetching state with sync unit id: ' + + this.event.payload.event_context.sync_unit_id + ); + + try { + const response = await axios.post( + this.workerUrl + '.get', + {}, + { + headers: { + Authorization: this.devrevToken, + }, + params: { + sync_unit: this.event.payload.event_context.sync_unit_id, + }, + } + ); + + this.state = JSON.parse(response.data.state); + + console.log('State fetched successfully'); + return this.state; + } catch (error) { + if (isAxiosError(error) && error.response?.status === 404) { + console.log('State not found, returning initial state'); + this.state = state; + this.postState(this.state); + return this.state; + } + + console.error('Failed to fetch state, error:' + error); + return error; + } + } +} diff --git a/src/types/extraction.ts b/src/types/extraction.ts index a5304dc..0f36277 100644 --- a/src/types/extraction.ts +++ b/src/types/extraction.ts @@ -1,4 +1,4 @@ -import { InputData,Context,ExecutionMetadata } from '@devrev/typescript-sdk/dist/snap-ins'; +import { InputData } from '@devrev/typescript-sdk/dist/snap-ins'; import { Artifact, ErrorRecord } from './common'; @@ -85,7 +85,7 @@ export interface ExternalSyncUnit { // Description of the external sync unit (either extracted or defined by the extractor) description: string; // Number of items in the external sync unit (if known, otherwise can be set to 0) - item_count: number; + item_count?: number; } // EventContextIn holds all the information that an external extrator needs to know about the extraction @@ -109,6 +109,8 @@ export interface EventContextIn { external_system_id: string; // ID of the source organization in which the extraction is happening uuid: string; + // URL of the worker data endpoint + worker_data_url: string; } export interface EventContextOut { @@ -144,9 +146,15 @@ export interface DomainObjectState { } export interface AirdropEvent { - context: Context; + context: { + secrets: { + service_account_token: string; + }; + }; payload: AirdropMessage; - execution_metadata: ExecutionMetadata; + execution_metadata: { + devrev_endpoint: string; + }; input_data: InputData; } @@ -163,7 +171,12 @@ export interface ExtractorEvent { event_type: string; event_context: EventContextOut; // JSON of the external extractor state, not touched by the adapter, just forwarded - extractor_state: string; + extractor_state?: string; // Event data event_data?: EventData; } + +export type AdapterState = ExtractorState & { + lastSyncStarted?: string; + lastSuccessfulSyncStarted?: string; +}; diff --git a/src/uploader/index.ts b/src/uploader/index.ts index 3877ee4..6301838 100644 --- a/src/uploader/index.ts +++ b/src/uploader/index.ts @@ -1,8 +1,8 @@ import axios from 'axios'; import { betaSDK, client } from '@devrev/typescript-sdk'; - -import { createFormData } from '../adapter/helpers'; +import fs, { promises as fsPromises } from 'fs'; +import { createFormData } from '../common/helpers'; import { Artifact, UploadResponse } from '../types/common'; /** @@ -16,17 +16,17 @@ import { Artifact, UploadResponse } from '../types/common'; * @constructor * @param {string} endpoint - The endpoint of the DevRev platform * @param {string} token - The token to authenticate with the DevRev platform + * @param {boolean} local - Flag to indicate if the uploader should upload to the file-system. */ export class Uploader { private betaDevrevSdk; - private publicDevrevSdk; - - constructor(endpoint: string, token: string) { + private local: boolean; + constructor(endpoint: string, token: string, local = false) { this.betaDevrevSdk = client.setupBeta({ endpoint, token, }); - this.publicDevrevSdk = client.setup({ endpoint, token }); + this.local = local; } /** @@ -46,6 +46,10 @@ export class Uploader { fetchedObjects: object[] | object, filetype: string = 'application/jsonl+json' ): Promise { + if (this.local) { + this.downloadToLocal(filename, fetchedObjects); + } + const preparedArtifact = await this.prepareArtifact(filename, filetype); if (!preparedArtifact) { @@ -75,6 +79,8 @@ export class Uploader { item_count: itemCount, }; + console.log(`Artifact uploaded successfully: ${artifact.id}`); + return { artifact, error: undefined }; } @@ -90,7 +96,8 @@ export class Uploader { return response.data; } catch (error) { - throw new Error('Error while fetching upload url: ' + error); + console.error('Error while preparing artifact: ' + error); + return null; } } @@ -113,4 +120,37 @@ export class Uploader { return null; } } + + private async downloadToLocal( + filePath: string, + fetchedObjects: object | object[] + ) { + console.log(`Uploading ${filePath} to local file system`); + try { + if (!fs.existsSync('extracted_files')) { + fs.mkdirSync('extracted_files'); + } + + const timestamp = new Date().getTime(); + const fileHandle = await fsPromises.open( + `extracted_files/${timestamp}_${filePath}`, + 'w' + ); + let objArray = []; + if (!Array.isArray(fetchedObjects)) { + objArray.push(fetchedObjects); + } else { + objArray = fetchedObjects; + } + for (const jsonObject of objArray) { + const jsonLine = JSON.stringify(jsonObject) + '\n'; + await fileHandle.write(jsonLine); + } + await fileHandle.close(); + console.log('Data successfully written to', filePath); + } catch (error) { + console.error('Error writing data to file:', error); + return Promise.reject(error); + } + } } diff --git a/tests/adapter.helpers.test.ts b/tests/adapter.helpers.test.ts new file mode 100644 index 0000000..0adc65f --- /dev/null +++ b/tests/adapter.helpers.test.ts @@ -0,0 +1,85 @@ +import { + createFormData, + createArtifact, + getTimeoutExtractorEventType, +} from '../src/common/helpers'; + +import { EventType, ExtractorEventType } from '../src/types'; + +describe('adapter.helpers.ts', () => { + it("should create a FormData object with the correct 'key' and 'value'", () => { + const preparedArtifact = { + form_data: [{ key: 'key', value: 'value' }], + }; + const fetchedObjects = [{ key: 'value' }]; + + const formData = createFormData(preparedArtifact, fetchedObjects); + + expect(formData).toBeInstanceOf(FormData); + expect(formData.get('key')).toBe('value'); + }); + + it("should create a FormData object with the correct 'file' key", () => { + const preparedArtifact = { + form_data: [{ key: 'key', value: 'value' }], + }; + const fetchedObjects = [{ key: 'value' }]; + + const formData = createFormData(preparedArtifact, fetchedObjects); + + expect(formData.get('file')).toBeDefined(); + }); + + it("should create an Artifact object with the correct 'item_count', 'id', and 'item_type'", () => { + const preparedArtifact = { id: 'id' }; + const fetchedObjects = [{ key: 'value' }]; + const entity = 'entity'; + + const artifact = createArtifact(preparedArtifact, fetchedObjects, entity); + + expect(artifact).toEqual({ + item_count: 1, + id: 'id', + item_type: 'entity', + }); + }); + + it('should return the correct ExtractorEventType', () => { + expect( + getTimeoutExtractorEventType(EventType.ExtractionMetadataStart) + ).toStrictEqual({ + eventType: ExtractorEventType.ExtractionMetadataError, + isError: true, + }); + expect( + getTimeoutExtractorEventType(EventType.ExtractionDataStart) + ).toStrictEqual({ + eventType: ExtractorEventType.ExtractionDataProgress, + isError: false, + }); + expect( + getTimeoutExtractorEventType(EventType.ExtractionDataContinue) + ).toStrictEqual({ + eventType: ExtractorEventType.ExtractionDataProgress, + isError: false, + }); + expect( + getTimeoutExtractorEventType(EventType.ExtractionAttachmentsStart) + ).toStrictEqual({ + eventType: ExtractorEventType.ExtractionAttachmentsProgress, + isError: false, + }); + expect( + getTimeoutExtractorEventType(EventType.ExtractionAttachmentsContinue) + ).toStrictEqual({ + eventType: ExtractorEventType.ExtractionAttachmentsProgress, + isError: false, + }); + expect( + getTimeoutExtractorEventType(EventType.ExtractionExternalSyncUnitsStart) + ).toStrictEqual({ + eventType: ExtractorEventType.ExtractionExternalSyncUnitsError, + isError: true, + }); + }); +}); diff --git a/tests/adapter.test.ts b/tests/adapter.test.ts index 7f60bbc..90dd5ca 100644 --- a/tests/adapter.test.ts +++ b/tests/adapter.test.ts @@ -1,85 +1,180 @@ import axios from 'axios'; -import { Adapter } from '../src/adapter'; -import { EventType, ExtractorEventType } from '../src/types'; -import { createAirdropEvent } from './test-helpers'; +import { createAdapter, Adapter } from '../src/adapter'; +import { State, createAdapterState } from '../src/state'; +import { + AirdropEvent, + EventType, + ExtractorEventType, + EventData, +} from '../src/types'; +import { getTimeoutExtractorEventType } from '../src/common/helpers'; + +jest.mock('axios'); +jest.mock('../src/state'); +jest.mock('../src/common/helpers'); +jest.mock('../src/logging'); + +const mockedAxios = axios as jest.Mocked; +const mockedCreateAdapterState = createAdapterState as jest.MockedFunction< + typeof createAdapterState +>; +const mockedGetTimeoutExtractorEventType = + getTimeoutExtractorEventType as jest.MockedFunction< + typeof getTimeoutExtractorEventType + >; describe('Adapter', () => { - it('should be able to create an instance of Adapter', () => { - const event = createAirdropEvent( - EventType.ExtractionExternalSyncUnitsStart - ); - const adapter = new Adapter(event); - expect(adapter).toBeInstanceOf(Adapter); + const event: AirdropEvent = { + execution_metadata: { + devrev_endpoint: 'devrev_endpoint', + }, + context: { + secrets: { + service_account_token: 'service_account_token', + }, + }, + payload: { + connection_data: { + org_id: 'org_id', + org_name: 'org_name', + key: 'key', + key_type: 'key_type', + }, + event_context: { + mode: 'mode', + callback_url: 'callback_url', + dev_org_id: 'dev_org_id', + dev_user_id: 'dev_user_id', + external_system_id: 'external_system_id', + uuid: 'uuid', + sync_run_id: 'sync_run_id', + sync_unit_id: 'sync_unit_id', + worker_data_url: 'worker_data_url', + }, + event_type: EventType.ExtractionDataStart, + }, + input_data: { + global_values: {}, + event_sources: {}, + }, + }; + + const initialState = { + customField: 'initial', + }; + + beforeEach(() => { + jest.clearAllMocks(); }); - describe('update', () => { - it('should add an artifact to the artifacts array', () => { - const event = createAirdropEvent(EventType.ExtractionDataStart); - const adapter = new Adapter(event); - const artifact = { id: 'id', item_type: 'item_type', item_count: 1 }; - adapter.update({ artifact }); - expect(adapter.getArtifacts()).toContain(artifact); - }); + describe('createAdapter', () => { + it('should create an Adapter instance', async () => { + const adapterState = new State(event, initialState); + mockedCreateAdapterState.mockResolvedValue(adapterState); + + const adapter = await createAdapter(event, initialState, false); - it('should not add anything if artifact is not provided', () => { - const event = createAirdropEvent(EventType.ExtractionDataStart); - const adapter = new Adapter(event); - adapter.update({}); - expect(adapter.getArtifacts()).toHaveLength(0); + expect(adapter).toBeInstanceOf(Adapter); + expect(mockedCreateAdapterState).toHaveBeenCalledWith( + event, + expect.anything() + ); }); }); - describe('emit', () => { - it('should call axios.post with correct parameters - case without error', async () => { - const event = createAirdropEvent(EventType.ExtractionDataStart); - const adapter = new Adapter(event); - const axiosSpy = jest.spyOn(axios, 'post').mockResolvedValue({}); - const newEventType = ExtractorEventType.ExtractionDataDone; - await adapter.emit(newEventType); + describe('Adapter', () => { + let adapter: Adapter; + let adapterState: State; - expect(axiosSpy).toHaveBeenCalledWith( - event.payload.event_context.callback_url, + beforeEach(() => { + adapterState = new State(event, initialState); + adapter = new Adapter(event, adapterState, false); + }); + + it('should emit event and save state if event type is not stateless', async () => { + const data: EventData = {}; + mockedAxios.post.mockResolvedValue({ data: {} }); + + await adapter.emit(ExtractorEventType.ExtractionDataDone, data); + + expect(mockedAxios.post).toHaveBeenCalledWith( + 'callback_url', expect.objectContaining({ - event_type: newEventType, + event_type: ExtractorEventType.ExtractionDataDone, }), - expect.objectContaining({ - headers: { - Accept: 'application/json, text/plain, */*', - Authorization: event.context.secrets["service_account_token"], - 'Content-Type': 'application/json', - }, - }) + expect.any(Object) ); - axiosSpy.mockRestore(); + expect(adapterState.postState).toHaveBeenCalledWith(adapter.state); }); - it('should call axios.post with correct parameters - case with error', async () => { - const event = createAirdropEvent(EventType.ExtractionDataStart); - const adapter = new Adapter(event); - const axiosSpy = jest.spyOn(axios, 'post').mockResolvedValue({}); - const newEventType = ExtractorEventType.ExtractionDataError; - const data = { - error: { message: 'some error message' }, + it('should not save state if event type is stateless', async () => { + const statelessEvent = { + ...event, + payload: { + ...event.payload, + event_type: EventType.ExtractionExternalSyncUnitsStart, + }, }; - await adapter.emit(newEventType, data); + adapter = new Adapter(statelessEvent, adapterState, false); + const data: EventData = {}; + mockedAxios.post.mockResolvedValue({ data: {} }); - expect(axiosSpy).toHaveBeenCalledWith( - event.payload.event_context.callback_url, + await adapter.emit( + ExtractorEventType.ExtractionExternalSyncUnitsDone, + data + ); + + expect(mockedAxios.post).toHaveBeenCalledWith( + 'callback_url', expect.objectContaining({ - event_type: newEventType, - event_data: expect.objectContaining({ - error: data.error, - }), + event_type: ExtractorEventType.ExtractionExternalSyncUnitsDone, }), + expect.any(Object) + ); + expect(adapterState.postState).not.toHaveBeenCalled(); + }); + + it('should exit adapter on heartbeat timeout', async () => { + mockedGetTimeoutExtractorEventType.mockReturnValue({ + eventType: ExtractorEventType.ExtractionMetadataError, + isError: true, + }); + jest + .spyOn(Date, 'now') + .mockImplementation( + () => adapter['startTime'] + adapter['lambdaTimeout'] + 1 + ); + + const heartbeatResult = await adapter['heartbeat'](); + + expect(heartbeatResult).toBe(true); + expect(mockedGetTimeoutExtractorEventType).toHaveBeenCalledWith( + event.payload.event_type + ); + expect(mockedAxios.post).toHaveBeenCalledWith( + 'callback_url', expect.objectContaining({ - headers: { - Accept: 'application/json, text/plain, */*', - Authorization: event.context.secrets["service_account_token"], - 'Content-Type': 'application/json', - }, - }) + event_type: ExtractorEventType.ExtractionMetadataError, + }), + expect.any(Object) ); - axiosSpy.mockRestore(); + }); + + it('should not emit event if adapter is in exit state', async () => { + adapter['exit'] = true; + + await adapter.emit(ExtractorEventType.ExtractionDataDone); + + expect(mockedAxios.post).not.toHaveBeenCalled(); + }); + + it('should handle failed event emission gracefully', async () => { + mockedAxios.post.mockRejectedValue(new Error('Network error')); + + await adapter.emit(ExtractorEventType.ExtractionDataDone); + + expect(mockedAxios.post).toHaveBeenCalled(); + expect(adapter['exit']).toBe(true); }); }); }); diff --git a/tests/demo-extractor.test.ts b/tests/demo-extractor.test.ts index 24854fe..9156ed3 100644 --- a/tests/demo-extractor.test.ts +++ b/tests/demo-extractor.test.ts @@ -1,153 +1,66 @@ import { AirdropEvent, EventType, ExtractorEventType } from '../src/types'; -import { Adapter } from '../src/adapter'; -import { Uploader } from '../src/uploader'; +import { createAdapter } from '../src/adapter'; import { DemoExtractor } from '../src/demo-extractor'; -import { createAirdropEvent } from './test-helpers'; -jest.mock('../src/adapter'); -jest.mock('../src/uploader'); +type ExtractorState = object; + +const mockEvent: AirdropEvent = { + execution_metadata: { + devrev_endpoint: 'devrev_endpoint', + }, + context: { + secrets: { + service_account_token: 'service_account_token', + }, + }, + payload: { + connection_data: { + org_id: 'org_id', + org_name: 'org_name', + key: 'key', + key_type: 'key_type', + }, + event_context: { + mode: 'mode', + callback_url: 'callback_url', + dev_org_id: 'dev_org_id', + dev_user_id: 'dev_user_id', + external_system_id: 'external_system_id', + uuid: 'uuid', + sync_run_id: 'sync_run_id', + worker_data_url: 'worker_data_url', + }, + event_type: EventType.ExtractionExternalSyncUnitsStart, + }, + input_data: { + global_values: {}, + event_sources: {}, + }, +}; describe('DemoExtractor', () => { - let adapterMock: jest.Mocked; - let uploaderMock: jest.Mocked; - let demoExtractor: DemoExtractor; - - beforeEach(() => { - adapterMock = new Adapter({} as AirdropEvent) as jest.Mocked; - uploaderMock = new Uploader('', '') as jest.Mocked; - (Adapter as jest.Mock).mockImplementation(() => adapterMock); - (Uploader as jest.Mock).mockImplementation(() => uploaderMock); - demoExtractor = new DemoExtractor(); - }); - - afterEach(() => { - jest.resetAllMocks(); - }); - - it('should be able to create an instance of DemoExtractor', () => { + it('should create a new instance of the DemoExtractor', async () => { + const adapter = await createAdapter(mockEvent, {}); + const demoExtractor = new DemoExtractor(mockEvent, adapter); expect(demoExtractor).toBeInstanceOf(DemoExtractor); }); - it('should emit ExtractionExternalSyncUnitsDone when EventType is ExtractionExternalSyncUnitsStart', async () => { - const event = createAirdropEvent( - EventType.ExtractionExternalSyncUnitsStart - ); - await demoExtractor.run(event); - expect(adapterMock.emit).toHaveBeenCalledWith( + it('should emit EXTRACTION_EXTERNAL_SYNC_UNITS_DONE with correct payload', async () => { + const adapter = await createAdapter(mockEvent, {}); + const demoExtractor = new DemoExtractor(mockEvent, adapter); + const spy = jest.spyOn(adapter, 'emit'); + await demoExtractor.run(); + expect(spy).toHaveBeenCalledWith( ExtractorEventType.ExtractionExternalSyncUnitsDone, - expect.objectContaining({ + { external_sync_units: [ { id: 'devrev', name: 'devrev', - description: 'Loopback for DevRev', - item_count: 0, + description: 'Demo external sync unit', }, ], - }) - ); - }); - - it('should emit ExtractionMetadataDone when metadata is uploaded successfully', async () => { - const event = createAirdropEvent(EventType.ExtractionMetadataStart); - const artifact = { id: 'id', item_type: 'item_type', item_count: 1 }; - - uploaderMock.upload.mockResolvedValueOnce({ artifact, error: undefined }); - - await demoExtractor.run(event); - - expect(adapterMock.update).toHaveBeenCalledWith({ artifact }); - expect(adapterMock.emit).toHaveBeenCalledWith( - ExtractorEventType.ExtractionMetadataDone - ); - }); - - it("should emit ExtractionMetadataError when there's an error uploading metadata", async () => { - const event = createAirdropEvent(EventType.ExtractionMetadataStart); - const error = new Error('Failed to upload metadata'); - - uploaderMock.upload.mockResolvedValueOnce({ artifact: undefined, error }); - - await demoExtractor.run(event); - - expect(adapterMock.emit).toHaveBeenCalledWith( - ExtractorEventType.ExtractionMetadataError, - { error } - ); - }); - - it('should emit ExtractionDataProgress when data is uploaded successfully', async () => { - const event = createAirdropEvent(EventType.ExtractionDataStart); - const artifact = { id: 'id', item_type: 'item_type', item_count: 1 }; - - uploaderMock.upload.mockResolvedValueOnce({ artifact, error: undefined }); - - await demoExtractor.run(event); - - expect(adapterMock.update).toHaveBeenCalledWith({ artifact }); - expect(adapterMock.emit).toHaveBeenCalledWith( - ExtractorEventType.ExtractionDataProgress, - { progress: 50 } - ); - }); - - it('should emit ExtractionDataDone when data is uploaded successfully', async () => { - const event = createAirdropEvent(EventType.ExtractionDataContinue); - const artifact = { id: 'id', item_type: 'item_type', item_count: 1 }; - - uploaderMock.upload - .mockResolvedValueOnce({ artifact, error: undefined }) // First call to upload (users data) - .mockResolvedValueOnce({ artifact, error: undefined }); // Second call to upload (recipe.json) - - await demoExtractor.run(event); - - expect(adapterMock.update).toHaveBeenCalledTimes(2); - expect(adapterMock.update).toHaveBeenCalledWith({ artifact }); - expect(adapterMock.emit).toHaveBeenCalledWith( - ExtractorEventType.ExtractionDataDone, - { progress: 100 } - ); - }); - - it("should emit ExtractionDataError when there's an error uploading data", async () => { - const event = createAirdropEvent(EventType.ExtractionDataStart); - const error = new Error('Failed to upload data'); - - uploaderMock.upload.mockResolvedValueOnce({ artifact: undefined, error }); - - await demoExtractor.run(event); - - expect(adapterMock.emit).toHaveBeenCalledWith( - ExtractorEventType.ExtractionDataError, - { error } - ); - }); - - it('should emit ExtractionAttachmentsDone when attachments are uploaded successfully', async () => { - const event = createAirdropEvent(EventType.ExtractionAttachmentsStart); - const artifact = { id: 'id', item_type: 'item_type', item_count: 1 }; - - uploaderMock.upload.mockResolvedValueOnce({ artifact, error: undefined }); - - await demoExtractor.run(event); - - expect(adapterMock.update).toHaveBeenCalledWith({ artifact }); - expect(adapterMock.emit).toHaveBeenCalledWith( - ExtractorEventType.ExtractionAttachmentsProgress - ); - }); - - it("should emit ExtractionAttachmentsError when there's an error uploading attachments", async () => { - const event = createAirdropEvent(EventType.ExtractionAttachmentsStart); - const error = new Error('Failed to upload attachment'); - - uploaderMock.upload.mockResolvedValueOnce({ artifact: undefined, error }); - - await demoExtractor.run(event); - - expect(adapterMock.emit).toHaveBeenCalledWith( - ExtractorEventType.ExtractionAttachmentsError, - { error } + } ); }); }); diff --git a/tests/helpers.test.ts b/tests/helpers.test.ts deleted file mode 100644 index fbc7738..0000000 --- a/tests/helpers.test.ts +++ /dev/null @@ -1,60 +0,0 @@ -import { - createFormData, - createArtifact, - getTimeoutExtractorEventType, -} from '../src/adapter/helpers'; - -import { EventType, ExtractorEventType } from '../src/types'; - -describe('createFormData', () => { - it('should create a FormData object', () => { - const preparedArtifact = { - form_data: [{ key: 'key', value: 'value' }], - }; - const fetchedObjects = [{ key: 'value' }]; - - const formData = createFormData(preparedArtifact, fetchedObjects); - - expect(formData).toBeInstanceOf(FormData); - expect(formData.get('key')).toBe('value'); - }); -}); - -describe('createArtifact', () => { - it('should create an Artifact object', () => { - const preparedArtifact = { id: 'id' }; - const fetchedObjects = [{ key: 'value' }]; - const entity = 'entity'; - - const artifact = createArtifact(preparedArtifact, fetchedObjects, entity); - - expect(artifact).toEqual({ - item_count: 1, - id: 'id', - item_type: 'entity', - }); - }); -}); - -describe('getTimeoutExtractorEventType', () => { - it('should return the correct ExtractorEventType', () => { - expect( - getTimeoutExtractorEventType(EventType.ExtractionMetadataStart) - ).toBe(ExtractorEventType.ExtractionMetadataError); - expect(getTimeoutExtractorEventType(EventType.ExtractionDataStart)).toBe( - ExtractorEventType.ExtractionDataProgress - ); - expect(getTimeoutExtractorEventType(EventType.ExtractionDataContinue)).toBe( - ExtractorEventType.ExtractionDataProgress - ); - expect( - getTimeoutExtractorEventType(EventType.ExtractionAttachmentsStart) - ).toBe(ExtractorEventType.ExtractionAttachmentsProgress); - expect( - getTimeoutExtractorEventType(EventType.ExtractionAttachmentsContinue) - ).toBe(ExtractorEventType.ExtractionAttachmentsProgress); - expect( - getTimeoutExtractorEventType(EventType.ExtractionExternalSyncUnitsStart) - ).toBe(ExtractorEventType.ExtractionExternalSyncUnitsError); - }); -}); diff --git a/tests/state.test.ts b/tests/state.test.ts new file mode 100644 index 0000000..75be652 --- /dev/null +++ b/tests/state.test.ts @@ -0,0 +1,132 @@ +import axios from 'axios'; + +import { createAdapterState, State } from '../src/state'; +import { AirdropEvent, EventType } from '../src/types'; +import { STATELESS_EVENT_TYPES } from '../src/common/constants'; + +jest.mock('axios'); + +const mockedAxios = axios as jest.Mocked; + +describe('AdapterState', () => { + const event: AirdropEvent = { + execution_metadata: { + devrev_endpoint: 'devrev_endpoint', + }, + context: { + secrets: { + service_account_token: 'service_account_token', + }, + }, + payload: { + connection_data: { + org_id: 'org_id', + org_name: 'org_name', + key: 'key', + key_type: 'key_type', + }, + event_context: { + mode: 'mode', + callback_url: 'callback_url', + dev_org_id: 'dev_org_id', + dev_user_id: 'dev_user_id', + external_system_id: 'external_system_id', + uuid: 'uuid', + sync_run_id: 'sync_run_id', + sync_unit_id: 'sync_unit_id', + worker_data_url: 'worker_data_url', + }, + event_type: EventType.ExtractionDataStart, + }, + input_data: { + global_values: {}, + event_sources: {}, + }, + }; + + const initialState = { + customField: 'initial', + }; + + beforeEach(() => { + jest.clearAllMocks(); + }); + + describe('createAdapterState', () => { + it('should create an AdapterState instance', async () => { + const adapterState = await createAdapterState(event, initialState); + expect(adapterState).toBeInstanceOf(State); + expect(adapterState.state).toEqual({ + ...initialState, + lastSyncStarted: '', + lastSuccessfulSyncStarted: '', + }); + }); + + it('should fetch state if event type is not stateless', async () => { + const spy = jest.spyOn(State.prototype, 'fetchState'); + await createAdapterState(event, initialState); + expect(spy).toHaveBeenCalled(); + }); + + it('should not fetch state if event type is stateless', async () => { + const statelessEvent = { + ...event, + payload: { ...event.payload, event_type: STATELESS_EVENT_TYPES[0] }, + }; + const spy = jest.spyOn(State.prototype, 'fetchState'); + await createAdapterState(statelessEvent, initialState); + expect(spy).not.toHaveBeenCalled(); + }); + }); + + describe('AdapterState', () => { + let adapterState: State; + + beforeEach(() => { + adapterState = new State(event, initialState); + }); + + it('should initialize with given state', () => { + expect(adapterState.state).toEqual({ + ...initialState, + lastSyncStarted: '', + lastSuccessfulSyncStarted: '', + }); + }); + + it('should update state via postState', async () => { + const newState = { ...initialState, customField: 'updated' }; + mockedAxios.post.mockResolvedValue({ data: {} }); + + await adapterState.postState(newState); + + expect(adapterState.state).toEqual(newState); + expect(mockedAxios.post).toHaveBeenCalledWith( + 'worker_data_url.update', + { state: JSON.stringify(newState) }, + { + headers: { Authorization: 'service_account_token' }, + params: { sync_unit: 'sync_unit_id' }, + } + ); + }); + + it('should handle non-404 error in fetchState', async () => { + const error = new Error('Network error'); + mockedAxios.post.mockRejectedValue(error); + + const state = await adapterState.fetchState(initialState); + + expect(state).toBe(error); + expect(mockedAxios.post).toHaveBeenCalledWith( + 'worker_data_url.get', + {}, + { + headers: { Authorization: 'service_account_token' }, + params: { sync_unit: 'sync_unit_id' }, + } + ); + }); + }); +}); diff --git a/tests/test-helpers.ts b/tests/test-helpers.ts deleted file mode 100644 index 1b393d0..0000000 --- a/tests/test-helpers.ts +++ /dev/null @@ -1,44 +0,0 @@ -import { - Context, - ExecutionMetadata, -} from '@devrev/typescript-sdk/dist/snap-ins'; -import { EventType, AirdropEvent } from '../src/types'; - -export function createAirdropEvent( - event_type: EventType, - overrides?: object -): AirdropEvent { - return { - execution_metadata: { - devrev_endpoint: 'devrev_endpoint', - } as unknown as ExecutionMetadata, - context: { - secrets: { - service_account_token: 'service_account_token', - }, - } as unknown as Context, - payload: { - connection_data: { - org_id: 'org_id', - org_name: 'org_name', - key: 'key', - key_type: 'key_type', - }, - event_context: { - mode: 'mode', - callback_url: 'callback_url', - dev_org_id: 'dev_org_id', - dev_user_id: 'dev_user_id', - external_system_id: 'external_system_id', - uuid: 'uuid', - sync_run_id: 'sync_run_id', - }, - event_type, - }, - input_data: { - global_values: {}, - event_sources: {}, - }, - ...overrides, - }; -} \ No newline at end of file diff --git a/tests/types.test.ts b/tests/types.test.ts deleted file mode 100644 index 987858b..0000000 --- a/tests/types.test.ts +++ /dev/null @@ -1,86 +0,0 @@ -import { - ExternalSyncUnit, - EventContextIn, - ConnectionData, - ErrorRecord, - Artifact, -} from '../src/types'; - -describe('Airdrop event types', () => { - describe('ExternalSyncUnit', () => { - test('should have the required properties', () => { - const externalSyncUnit: ExternalSyncUnit = { - id: '123', - name: 'test-unit-name', - description: 'test-unit-description', - item_count: 0, - }; - - expect(externalSyncUnit).toHaveProperty('id'); - expect(externalSyncUnit).toHaveProperty('name'); - expect(externalSyncUnit).toHaveProperty('description'); - expect(externalSyncUnit).toHaveProperty('item_count'); - }); - }); - - describe('EventContextIn', () => { - test('should have the required properties', () => { - const eventContext: EventContextIn = { - mode: 'INITIAL', - callback_url: 'https://test.com', - dev_org_id: 'DEV-123', - dev_user_id: 'DEVU-123', - external_system_id: '123', - sync_run_id: '123', - uuid: '123', - }; - - expect(eventContext).toHaveProperty('mode'); - expect(eventContext).toHaveProperty('callback_url'); - expect(eventContext).toHaveProperty('dev_org_id'); - expect(eventContext).toHaveProperty('dev_user_id'); - expect(eventContext).toHaveProperty('external_system_id'); - expect(eventContext).toHaveProperty('uuid'); - }); - }); - - describe('ConnectionData', () => { - test('should have the required properties', () => { - const connectionData: ConnectionData = { - org_id: '123', - org_name: 'test-org-name', - key: 'test-key', - key_type: 'test-key-type', - }; - - expect(connectionData).toHaveProperty('org_id'); - expect(connectionData).toHaveProperty('org_name'); - expect(connectionData).toHaveProperty('key'); - expect(connectionData).toHaveProperty('key_type'); - }); - }); - - describe('ErrorRecord', () => { - test('should have the required properties', () => { - const errorRecord: ErrorRecord = { - message: 'test-message', - }; - - expect(errorRecord).toHaveProperty('message'); - }); - }); - - describe('Artifact', () => { - test('should have the required properties', () => { - const artifact: Artifact = { - item_count: 0, - id: '123', - item_type: 'test-item-type', - }; - - expect(artifact).toHaveProperty('item_type'); - expect(artifact).toHaveProperty('id'); - expect(artifact).toHaveProperty('item_count'); - }); - }); -}); \ No newline at end of file diff --git a/tests/uploader.test.ts b/tests/uploader.test.ts new file mode 100644 index 0000000..01e21f4 --- /dev/null +++ b/tests/uploader.test.ts @@ -0,0 +1,36 @@ +import { Uploader } from '../src/uploader'; + +// mock uploader.upload method +jest.mock('../src/uploader', () => { + return { + Uploader: jest.fn().mockImplementation(() => { + return { + upload: jest.fn().mockResolvedValue({ + artifact: { key: 'value' }, + error: undefined, + }), + }; + }), + }; +}); + +describe('uploader.ts', () => { + const uploader = new Uploader('https://example.com', 'test-token', false); + + it('should upload the file to the DevRev platform and return the artifact information', async () => { + const filename = 'filename'; + const entity = 'entity'; + const fetchedObjects = [{ key: 'value' }]; + + const uploadResponse = await uploader.upload( + filename, + entity, + fetchedObjects + ); + + expect(uploadResponse).toEqual({ + artifact: { key: 'value' }, + error: undefined, + }); + }); +});