Skip to content

Commit

Permalink
Unified incoming and outgoing event context. Added dev_oid to logger …
Browse files Browse the repository at this point in the history
  • Loading branch information
radovan-jorgic authored Dec 17, 2024
1 parent aada8f9 commit baf79e1
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 35 deletions.
81 changes: 69 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## Release Notes

#### v1.1.2

- Unified incoming and outgoing event context.
- Added `dev_oid` to logger tags.

#### v1.1.1

- Added default workers for loading deletion events.
Expand Down Expand Up @@ -54,14 +59,14 @@

# Overview

The ADaaS (Airdrop-as-a-Service) Library for TypeScript helps developers build Snap-ins that integrate with DevRev’s ADaaS platform. This library simplifies the workflow for handling data extraction, event-driven actions, state management, and artifact handling.
The ADaaS (Airdrop-as-a-Service) Library for TypeScript helps developers build Snap-ins that integrate with DevRev’s ADaaS platform. This library simplifies the workflow for handling data extraction and loading, event-driven actions, state management, and artifact handling.

## Features
It provides features such as:

- Type Definitions: Structured types for ADaaS control protocol
- Event Management: Easily emit events for different extraction phases
- Event Management: Easily emit events for different extraction or loading phases
- State Handling: Update and access state in real-time within tasks
- Artifact Management: Supports batched storage of artifacts (2000 items per batch)
- Artifact Management: Supports batched storage of artifacts
- Error & Timeout Support: Error handling and timeout management for long-running tasks

# Installation
Expand All @@ -72,7 +77,22 @@ npm install @devrev/ts-adaas

# Usage

ADaaS Snap-ins are composed of several phases, each with unique requirements for initialization, data extraction, and error handling. The ADaaS library exports processTask to structure the work within each phase. The processTask function accepts task and onTimeout handlers, giving access to the adapter to streamline state updates, upload of extracted data, and event emission.
ADaaS Snap-ins can import data in both directions: from external sources to DevRev and from DevRev to external sources. Both directions are composed of several phases.

From external source to DevRev:

- External Sync Units Extraction
- Metadata Extraction
- Data Extraction
- Attachments Extraction

From DevRev to external source:

- Data Loading

Each phase comes with unique requirements for processing task, and both timeout and error handling.

The ADaaS library exports processTask to structure the work within each phase, and onTimeout function to handle timeouts.

### ADaaS Snap-in Invocation

Expand Down Expand Up @@ -127,10 +147,14 @@ const run = async (events: AirdropEvent[]) => {
export default run;
```

## Extraction Phases
## Extraction

The ADaaS snap-in extraction lifecycle consists of three main phases: External Sync Units Extraction, Metadata Extraction, and Data Extraction. Each phase is defined in a separate file and is responsible for fetching the respective data.

The ADaaS library provides a repository management system to handle artifacts in batches. The `initializeRepos` function initializes the repositories, and the `push` function uploads the artifacts to the repositories. The `postState` function is used to post the state of the extraction task.

State management is crucial for ADaaS Snap-ins to maintain the state of the extraction task. The `postState` function is used to post the state of the extraction task. The state is stored in the adapter and can be retrieved using the `adapter.state` property.

### 1. External Sync Units Extraction

This phase is defined in `external-sync-units-extraction.ts` and is responsible for fetching the external sync units.
Expand Down Expand Up @@ -243,7 +267,7 @@ processTask({
});
```

## 4. Attachments Streaming
### 4. Attachments Streaming

The ADaaS library handles attachments streaming to improve efficiency and reduce complexity for developers. During the extraction phase, developers need only to provide metadata in a specific format for each attachment, and the library manages the streaming process.

Expand All @@ -259,12 +283,45 @@ export interface NormalizedAttachment {
}
```

## Artifact Uploading and State Management
## Loading phases

The ADaaS library provides a repository management system to handle artifacts in batches. The `initializeRepos` function initializes the repositories, and the `push` function uploads the artifacts to the repositories. The `postState` function is used to post the state of the extraction task.
### 1. Data Loading

State management is crucial for ADaaS Snap-ins to maintain the state of the extraction task. The `postState` function is used to post the state of the extraction task. The state is stored in the adapter and can be retrieved using the `adapter.state` property.
This phase is defined in `data-loading.ts` and is responsible for loading the data to the external system.

Loading is done by providing an ordered list of itemTypes to load and their respective create and update functions.

```typescript
processTask({
task: async ({ adapter }) => {
const { reports, processed_files } = await adapter.loadItemTypes({
itemTypesToLoad: [
{
itemType: 'tickets',
create: createTicket,
update: updateTicket,
},
{
itemType: 'conversations',
create: createConversation,
update: updateConversation,
},
],
});

await adapter.emit(LoaderEventType.DataLoadingDone, {
reports,
processed_files,
});
},
onTimeout: async ({ adapter }) => {
await adapter.emit(LoaderEventType.DataLoadingProgress, {
reports: adapter.reports,
processed_files: adapter.processedFiles,
});
});
```
## Timeout Handling
The loading functions `create` and `update` provide loading to the external system. They provide denormalization of the records to the schema of the external system and provide HTTP calls to the external system. Both loading functions must handle rate limiting for the external system and handle errors.
The ADaaS library provides a timeout handler to handle timeouts in long-running tasks. The `onTimeout` handler is called when the task exceeds the timeout limit. The handler can be used to post the state of the extraction task and emit an event when a timeout occurs.
Functions return an ID and modified date of the record in the external system, or specify rate-liming offset or errors, if the record could not be created or updated.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@devrev/ts-adaas",
"version": "1.1.1",
"version": "1.1.2",
"description": "Typescript library containing the ADaaS(AirDrop as a Service) control protocol.",
"type": "commonjs",
"main": "./dist/index.js",
Expand Down
8 changes: 1 addition & 7 deletions src/common/control-protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,7 @@ export const emit = async ({
}: EmitInterface): Promise<void | Error> => {
const newEvent: ExtractorEvent | LoaderEvent = {
event_type: eventType,
event_context: {
uuid: event.payload.event_context.uuid,
sync_run: event.payload.event_context.sync_run_id,
...(event.payload.event_context.sync_unit_id && {
sync_unit: event.payload.event_context.sync_unit_id,
}),
},
event_context: event.payload.event_context,
event_data: {
...data,
},
Expand Down
8 changes: 1 addition & 7 deletions src/deprecated/adapter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,7 @@ export class Adapter<ConnectorState> {

const newEvent: ExtractorEvent = {
event_type: newEventType,
event_context: {
uuid: this.event.payload.event_context.uuid,
sync_run: this.event.payload.event_context.sync_run_id,
...(this.event.payload.event_context.sync_unit_id && {
sync_unit: this.event.payload.event_context.sync_unit_id,
}),
},
event_context: this.event.payload.event_context,
event_data: {
...data,
},
Expand Down
4 changes: 1 addition & 3 deletions src/logger/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,20 @@ import {
import { isMainThread, parentPort } from 'node:worker_threads';
import { WorkerAdapterOptions, WorkerMessageSubject } from '../types/workers';
import { AxiosError } from 'axios';
import { AirdropEvent } from '../types/extraction';

export class Logger extends Console {
private event: AirdropEvent;
private options?: WorkerAdapterOptions;

constructor({ event, options }: LoggerFactoryInterface) {
super(process.stdout, process.stderr);
this.event = event;
this.options = options;

log.options.levelKey = null;
log.options.tagsKey = null;
log.options.messageKey = 'message';
log.options.meta = {
...event.payload.event_context,
dev_oid: event.payload.event_context.dev_org,
};
}

Expand Down
35 changes: 32 additions & 3 deletions src/types/extraction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ export interface ExternalSyncUnit {

/**
* EventContextIn is an interface that defines the structure of the input event context that is sent to the external extractor from ADaaS.
* @deprecated
*/
export interface EventContextIn {
callback_url: string;
Expand Down Expand Up @@ -116,13 +117,41 @@ export interface EventContextIn {

/**
* EventContextOut is an interface that defines the structure of the output event context that is sent from the external extractor to ADaaS.
* @deprecated
*/
export interface EventContextOut {
uuid: string;
sync_run: string;
sync_unit?: string;
}

/**
* EventContext is an interface that defines the structure of the event context that is sent to and from the external connector.
*/
export interface EventContext {
callback_url: string;
dev_org: string;
dev_org_id: string;
dev_user: string;
dev_user_id: string;
external_sync_unit: string;
external_sync_unit_id: string;
external_sync_unit_name: string;
external_system: string;
external_system_type: string;
import_slug: string;
mode: string;
request_id: string;
snap_in_slug: string;
sync_run: string;
sync_run_id: string;
sync_tier: string;
sync_unit: DonV2;
sync_unit_id: string;
uuid: string;
worker_data_url: string;
}

/**
* ConnectionData is an interface that defines the structure of the connection data that is sent to the external extractor from ADaaS.
* It contains the organization ID, organization name, key, and key type.
Expand Down Expand Up @@ -192,7 +221,7 @@ export interface AirdropEvent {
*/
export interface AirdropMessage {
connection_data: ConnectionData;
event_context: EventContextIn;
event_context: EventContext;
event_type: EventType;
event_data?: EventData;
}
Expand All @@ -203,7 +232,7 @@ export interface AirdropMessage {
*/
export interface ExtractorEvent {
event_type: string;
event_context: EventContextOut;
event_context: EventContext;
event_data?: EventData;
}

Expand All @@ -212,6 +241,6 @@ export interface ExtractorEvent {
*/
export interface LoaderEvent {
event_type: string;
event_context: EventContextOut;
event_context: EventContext;
event_data?: EventData;
}

0 comments on commit baf79e1

Please sign in to comment.