Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental sync for Attio provider #701

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions packages/api/scripts/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2900,6 +2900,20 @@ COMMENT ON COLUMN connections.token_type IS 'The type of the token, such as "Bea
COMMENT ON COLUMN connections.connection_token IS 'Connection token users will put in their header to identify which service / linked_User they make request for';


-- ************************************** vertical_objects_sync_track_data
CREATE TABLE vertical_objects_sync_track_data
(
id_vertical_objects_sync_track_data uuid NOT NULL,
vertical text NOT NULL,
provider_slug text NOT NULL,
object text NOT NULL,
pagination_type text NOT NULL,
id_connection uuid NOT NULL,
data json,
CONSTRAINT PK_vertical_objects_sync_track_data PRIMARY KEY ( id_vertical_objects_sync_track_data ),
CONSTRAINT FK_connection FOREIGN KEY ( id_connection ) REFERENCES connections ( id_connection )
);

-- ************************************** ats_scorecards
CREATE TABLE ats_scorecards
(
Expand Down
141 changes: 128 additions & 13 deletions packages/api/src/crm/company/services/attio/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import { EncryptionService } from '@@core/@core-services/encryption/encryption.s
import { ApiResponse } from '@@core/utils/types';
import { ICompanyService } from '@crm/company/types';
import { ServiceRegistry } from '../registry.service';
import { AttioCompanyInput, AttioCompanyOutput } from './types';
import { AttioCompanyInput, AttioCompanyOutput, paginationType } from './types';
import { SyncParam } from '@@core/utils/types/interface';
import { OriginalCompanyOutput } from '@@core/utils/types/original/original.crm';
import { v4 as uuidv4 } from 'uuid';

@Injectable()
export class AttioService implements ICompanyService {
Expand Down Expand Up @@ -74,20 +75,134 @@ export class AttioService implements ICompanyService {
vertical: 'crm',
},
});
const resp = await axios.post(
`${connection.account_url}/v2/objects/companies/records/query`,
{},
{
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${this.cryptoService.decrypt(
connection.access_token,
)}`,
},

const paginationTrackInfo = await this.prisma.vertical_objects_sync_track_data.findFirst({
where: {
id_connection: connection.id_connection,
vertical: 'crm',
provider_slug: 'attio',
object: 'company',
},
);
});

let respData: AttioCompanyOutput[] = [];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use const instead of let.

The variable respData is only assigned once during its initialization and is not reassigned throughout the code. Using const instead of let would ensure that respData cannot be accidentally reassigned, improving code quality and readability.

Apply this diff to use const:

-let respData = [];
+const respData = [];
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let respData: AttioCompanyOutput[] = [];
const respData: AttioCompanyOutput[] = [];
Tools
Biome

[error] 88-88: This let declares a variable that is only assigned once.

'respData' is never reassigned.

Safe fix: Use const instead.

(lint/style/useConst)


Consider removing the type annotation.

The type annotation AttioCompanyOutput[] is not necessary as TypeScript can infer the type of respData based on its initialization to an empty array []. Removing the type annotation would not affect the functionality of the code and would improve readability.

Apply this diff to remove the type annotation:

-let respData: AttioCompanyOutput[] = [];
+let respData = [];
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let respData: AttioCompanyOutput[] = [];
let respData = [];
Tools
Biome

[error] 88-88: This let declares a variable that is only assigned once.

'respData' is never reassigned.

Safe fix: Use const instead.

(lint/style/useConst)

let initialOffset: number = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider removing the type annotation.

The type annotation number is not necessary as TypeScript can infer the type of initialOffset based on its initialization to 0. Removing the type annotation would not affect the functionality of the code and would improve readability.

Apply this diff to remove the type annotation:

-let initialOffset: number = 0;
+let initialOffset = 0;
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let initialOffset: number = 0;
let initialOffset = 0;
Tools
Biome

[error] 89-89: This type annotation is trivially inferred from its initialization.

Safe fix: Remove the type annotation.

(lint/style/noInferrableTypes)


if (!paginationTrackInfo) {
// Intial sync
try {
while (true) {
const resp = await axios.post(
`${connection.account_url}/v2/objects/companies/records/query`,
{
"sorts": [
{
"attribute": "created_at",
"direction": "asc"
}
],
"offset": initialOffset,
"limit": 500
},
{
headers: {
accept: 'application/json',
'Content-Type': 'application/json',
Authorization: `Bearer ${this.cryptoService.decrypt(
connection.access_token,
)}`,
},
},
);


respData.push(...resp.data.data);
initialOffset = initialOffset + resp.data.data.length;

if (resp.data.data.length < 500) {
break;
}
}
}
catch (error) {
this.logger.log(`Error in initial sync ${error.message} and last offset is ${initialOffset}`);
}

}
else {
// Incremental sync
const currentPaginationData = paginationTrackInfo.data as paginationType;
initialOffset = currentPaginationData.offset;

try {
while (true) {
const resp = await axios.post(
`${connection.account_url}/v2/objects/companies/records/query`,
{
"sorts": [
{
"attribute": "created_at",
"direction": "asc"
}
],
"offset": initialOffset,
"limit": 500
},
{
headers: {
accept: 'application/json',
'Content-Type': 'application/json',
Authorization: `Bearer ${this.cryptoService.decrypt(
connection.access_token,
)}`
}
}
);

respData.push(...resp.data.data);
initialOffset = initialOffset + resp.data.data.length;

if (resp.data.data.length < 500) {
break;
}
}
}
catch (error) {
this.logger.log(`Error in incremental sync ${error.message} and last offset is ${initialOffset}`);
}
}

// create or update records
if (paginationTrackInfo) {
await this.prisma.vertical_objects_sync_track_data.update({
where: {
id_vertical_objects_sync_track_data: paginationTrackInfo.id_vertical_objects_sync_track_data,
},
data: {
data: {
offset: initialOffset,
},
},
});
}
else {
await this.prisma.vertical_objects_sync_track_data.create({
data: {
id_vertical_objects_sync_track_data: uuidv4(),
vertical: 'crm',
provider_slug: 'attio',
object: 'company',
pagination_type: 'offset',
id_connection: connection.id_connection,
data: {
offset: initialOffset,
},
},
});
}

return {
data: resp.data.data,
data: respData,
message: 'Attio companies retrieved',
statusCode: 200,
};
Expand Down
4 changes: 4 additions & 0 deletions packages/api/src/crm/company/services/attio/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,7 @@ export interface AttioCompany {

export type AttioCompanyInput = Partial<AttioCompany>;
export type AttioCompanyOutput = AttioCompanyInput;

export type paginationType = {
offset: number;
};
140 changes: 126 additions & 14 deletions packages/api/src/crm/contact/services/attio/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import { ActionType, handle3rdPartyServiceError } from '@@core/utils/errors';
import { EncryptionService } from '@@core/@core-services/encryption/encryption.service';
import { ApiResponse } from '@@core/utils/types';
import { ServiceRegistry } from '../registry.service';
import { AttioContactInput, AttioContactOutput } from './types';
import { v4 as uuidv4 } from 'uuid';
import { AttioContactInput, AttioContactOutput, paginationType } from './types';
import { SyncParam } from '@@core/utils/types/interface';

@Injectable()
Expand Down Expand Up @@ -74,22 +75,133 @@ export class AttioService implements IContactService {
},
});

const resp = await axios.post(
`${connection.account_url}/v2/objects/people/records/query`,
{},
{
headers: {
accept: 'application/json',
'Content-Type': 'application/json',
Authorization: `Bearer ${this.cryptoService.decrypt(
connection.access_token,
)}`,
},
const paginationTrackInfo = await this.prisma.vertical_objects_sync_track_data.findFirst({
where: {
id_connection: connection.id_connection,
vertical: 'crm',
provider_slug: 'attio',
object: 'contact',
},
);
});

let respData: AttioContactOutput[] = [];
let initialOffset: number = 0;

if (!paginationTrackInfo) {
// Intial sync
try {
while (true) {
const resp = await axios.post(
`${connection.account_url}/v2/objects/people/records/query`,
{
"sorts": [
{
"attribute": "created_at",
"direction": "asc"
}
],
"offset": initialOffset,
"limit": 500
},
{
headers: {
accept: 'application/json',
'Content-Type': 'application/json',
Authorization: `Bearer ${this.cryptoService.decrypt(
connection.access_token,
)}`,
},
},
);


respData.push(...resp.data.data);
initialOffset = initialOffset + resp.data.data.length;

if (resp.data.data.length < 500) {
break;
}
}
}
catch (error) {
this.logger.log(`Error in initial sync ${error.message} and last offset is ${initialOffset}`);
}

Comment on lines +87 to +129
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial sync implementation looks good!

The code correctly performs an initial synchronization by repeatedly querying the Attio API with pagination until all records are retrieved. Error handling is in place to log any issues during the sync process.

Consider using const instead of let for respData as it is never reassigned:

-let respData: AttioContactOutput[] = [];
+const respData: AttioContactOutput[] = [];

This change adheres to the best practice of using const for variables that are not reassigned.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let respData: AttioContactOutput[] = [];
let initialOffset: number = 0;
if (!paginationTrackInfo) {
// Intial sync
try {
while (true) {
const resp = await axios.post(
`${connection.account_url}/v2/objects/people/records/query`,
{
"sorts": [
{
"attribute": "created_at",
"direction": "asc"
}
],
"offset": initialOffset,
"limit": 500
},
{
headers: {
accept: 'application/json',
'Content-Type': 'application/json',
Authorization: `Bearer ${this.cryptoService.decrypt(
connection.access_token,
)}`,
},
},
);
respData.push(...resp.data.data);
initialOffset = initialOffset + resp.data.data.length;
if (resp.data.data.length < 500) {
break;
}
}
}
catch (error) {
this.logger.log(`Error in initial sync ${error.message} and last offset is ${initialOffset}`);
}
const respData: AttioContactOutput[] = [];
let initialOffset: number = 0;
if (!paginationTrackInfo) {
// Intial sync
try {
while (true) {
const resp = await axios.post(
`${connection.account_url}/v2/objects/people/records/query`,
{
"sorts": [
{
"attribute": "created_at",
"direction": "asc"
}
],
"offset": initialOffset,
"limit": 500
},
{
headers: {
accept: 'application/json',
'Content-Type': 'application/json',
Authorization: `Bearer ${this.cryptoService.decrypt(
connection.access_token,
)}`,
},
},
);
respData.push(...resp.data.data);
initialOffset = initialOffset + resp.data.data.length;
if (resp.data.data.length < 500) {
break;
}
}
}
catch (error) {
this.logger.log(`Error in initial sync ${error.message} and last offset is ${initialOffset}`);
}
Tools
Biome

[error] 88-88: This type annotation is trivially inferred from its initialization.

Safe fix: Remove the type annotation.

(lint/style/noInferrableTypes)


[error] 87-87: This let declares a variable that is only assigned once.

'respData' is never reassigned.

Safe fix: Use const instead.

(lint/style/useConst)

}
else {
// Incremental sync
const currentPaginationData = paginationTrackInfo.data as paginationType;
initialOffset = currentPaginationData.offset;

try {
while (true) {
const resp = await axios.post(
`${connection.account_url}/v2/objects/people/records/query`,
{
"sorts": [
{
"attribute": "created_at",
"direction": "asc"
}
],
"offset": initialOffset,
"limit": 500
},
{
headers: {
accept: 'application/json',
'Content-Type': 'application/json',
Authorization: `Bearer ${this.cryptoService.decrypt(
connection.access_token,
)}`
}
}
);

respData.push(...resp.data.data);
initialOffset = initialOffset + resp.data.data.length;

if (resp.data.data.length < 500) {
break;
}
}
}
catch (error) {
this.logger.log(`Error in incremental sync ${error.message} and last offset is ${initialOffset}`);
}
}

// create or update records
if (paginationTrackInfo) {
await this.prisma.vertical_objects_sync_track_data.update({
where: {
id_vertical_objects_sync_track_data: paginationTrackInfo.id_vertical_objects_sync_track_data,
},
data: {
data: {
offset: initialOffset,
},
},
});
}
else {
await this.prisma.vertical_objects_sync_track_data.create({
data: {
id_vertical_objects_sync_track_data: uuidv4(),
vertical: 'crm',
provider_slug: 'attio',
object: 'contact',
pagination_type: 'offset',
id_connection: connection.id_connection,
data: {
offset: initialOffset,
},
},
});
}

return {
data: resp.data.data,
data: respData,
message: 'Attio contacts retrieved',
statusCode: 200,
};
Expand Down
5 changes: 5 additions & 0 deletions packages/api/src/crm/contact/services/attio/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,8 @@ export interface AttioContact {

export type AttioContactInput = Partial<AttioContact>;
export type AttioContactOutput = AttioContactInput;


export type paginationType = {
offset: number;
};
Loading
Loading