Skip to content

Commit

Permalink
Added sync direction parameter to defineSyncIntegration and defineSyn…
Browse files Browse the repository at this point in the history
…cAction
  • Loading branch information
ebouck committed Nov 21, 2024
1 parent c7ce57f commit d359a75
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 102 deletions.
5 changes: 5 additions & 0 deletions .changeset/poor-mugs-doubt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@runlightyear/lightyear": minor
---

Added sync direction parameter to defineSyncIntegration and defineSyncAction
3 changes: 2 additions & 1 deletion packages/@runlightyear/lightyear/src/base/syncAction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export interface DefineSyncActionProps {
full?: number;
hardDelete?: number;
};
direction?: "pull" | "push" | "bidirectional";
}

function isConnectorClass(
Expand Down Expand Up @@ -156,7 +157,7 @@ export function defineSyncAction(props: DefineSyncActionProps) {
console.info(`Updated sync type to ${syncType}`);

try {
await synchronizer.sync(sync.id);
await synchronizer.sync(sync.id, props.direction);
await updateSync({
collection: props.collection,
syncId: sync.id,
Expand Down
2 changes: 2 additions & 0 deletions packages/@runlightyear/lightyear/src/base/syncIntegration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export interface DefineSyncIntegrationProps {
full?: number;
hardDelete?: number;
};
direction?: "pull" | "push" | "bidirectional";
}

function isConnectorClass(
Expand Down Expand Up @@ -76,6 +77,7 @@ export function defineSyncIntegration(props: DefineSyncIntegrationProps) {
collection: props.collection,
synchronizer: props.synchronizer,
frequency: props.frequency,
direction: props.direction,
}),
],
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,17 @@ export abstract class CollectionSynchronizer {
return modelSynchronizer(this.getModelSynchronizerProps(name));
}

async sync(syncId: string) {
async sync(
syncId: string,
direction: "push" | "pull" | "bidirectional" = "bidirectional"
) {
const modelsToSync = await this.getModelOrder();

for (const modelName of modelsToSync) {
const model = await this.getModel(modelName);

if (model) {
await model.sync(syncId);
await model.sync(syncId, direction);
}
}
}
Expand Down
206 changes: 107 additions & 99 deletions packages/@runlightyear/lightyear/src/synchronizers/ModelSynchronizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ export abstract class ModelSynchronizer<T> {
return external;
}

async sync(syncId: string) {
async sync(
syncId: string,
direction: "pull" | "push" | "bidirectional" = "bidirectional"
) {
const authData = this.connector.getAuthData();
if (!authData) {
throw new Error("Must have auth to sync");
Expand All @@ -175,112 +178,117 @@ export abstract class ModelSynchronizer<T> {

let listCounter = 0;

do {
const listResponse: {
objects: Array<FullObjectProps<T>>;
cursor?: string;
} = await this.list({ syncType, lastUpdatedAt, cursor });
objects = listResponse.objects;
cursor = listResponse.cursor;

// for (const obj of objects) {
// if (obj.isDeleted) {
// await this.delete(obj.id);
// } else {
// console.log("Skipping upsert for now");
if (objects.length === 0) {
console.info("Nothing to upsert");
} else {
await upsertObjectBatch({
collection: this.collection,
syncId,
model: this.model,
app: authData.appName ?? undefined,
customApp: authData.customAppName ?? undefined,
objects: objects.map((obj) => ({
managedUserId,
localObjectId: obj.id,
localUpdatedAt: obj.updatedAt,
data: obj.data,
})),
async: true,
});
console.info("Upserted batch");
// lastUpdatedAt = obj.updatedAt;
// console.log("lastUpdatedAt", lastUpdatedAt);
listCounter += objects.length;
console.info("Objects processed:", listCounter);
// }
// }
}
} while (cursor);

console.info("Processing delta");
let more;
let changeCounter = 0;
do {
const delta = await getDelta({
collection: this.collection,
managedUserExternalId: authData.managedUser?.externalId ?? null,
app: authData.appName,
customApp: authData.customAppName,
model: this.model,
});
more = delta.more;

console.debug("Delta:", delta);

for (const change of delta.changes) {
if (change.operation === "CREATE") {
const newObjectId = await this.create({
data: change.data,
});

const newObject = await this.get(newObjectId);

await upsertObject({
if (direction === "pull" || direction === "bidirectional") {
console.info("Pulling");
do {
const listResponse: {
objects: Array<FullObjectProps<T>>;
cursor?: string;
} = await this.list({ syncType, lastUpdatedAt, cursor });
objects = listResponse.objects;
cursor = listResponse.cursor;

// for (const obj of objects) {
// if (obj.isDeleted) {
// await this.delete(obj.id);
// } else {
// console.log("Skipping upsert for now");
if (objects.length === 0) {
console.info("Nothing to upsert");
} else {
await upsertObjectBatch({
collection: this.collection,
syncId,
model: this.model,
app: authData.appName ?? undefined,
customApp: authData.customAppName ?? undefined,
objectId: change.objectId,
managedUserId: authData.managedUser?.externalId ?? null,
localObjectId: newObject.id,
localUpdatedAt: newObject.updatedAt,
data: newObject.data,
});
} else if (change.operation === "UPDATE") {
await this.update({
id: change.localObjectId,
data: change.data,
objects: objects.map((obj) => ({
managedUserId,
localObjectId: obj.id,
localUpdatedAt: obj.updatedAt,
data: obj.data,
})),
async: true,
});
console.info("Upserted batch");
// lastUpdatedAt = obj.updatedAt;
// console.log("lastUpdatedAt", lastUpdatedAt);
listCounter += objects.length;
console.info("Objects processed:", listCounter);
// }
// }
}
} while (cursor);
}

const updatedObject = await this.get(change.localObjectId);

await upsertObject({
collection: this.collection,
syncId,
model: this.model,
app: authData.appName ?? undefined,
customApp: authData.customAppName ?? undefined,
managedUserId: authData.managedUser?.externalId ?? null,
localObjectId: updatedObject.id,
localUpdatedAt: updatedObject.updatedAt,
data: updatedObject.data,
});
// } else if (change.operation === "DELETE") {
// await this.delete(change.objectId);
// await deleteObject({
// collection: this.collection,
// model: this.model,
// externalId: change.objectId,
// });
if (direction === "push" || direction === "bidirectional") {
console.info("Pushing");
let more;
let changeCounter = 0;
do {
const delta = await getDelta({
collection: this.collection,
managedUserExternalId: authData.managedUser?.externalId ?? null,
app: authData.appName,
customApp: authData.customAppName,
model: this.model,
});
more = delta.more;

console.debug("Delta:", delta);

for (const change of delta.changes) {
if (change.operation === "CREATE") {
const newObjectId = await this.create({
data: change.data,
});

const newObject = await this.get(newObjectId);

await upsertObject({
collection: this.collection,
syncId,
model: this.model,
app: authData.appName ?? undefined,
customApp: authData.customAppName ?? undefined,
objectId: change.objectId,
managedUserId: authData.managedUser?.externalId ?? null,
localObjectId: newObject.id,
localUpdatedAt: newObject.updatedAt,
data: newObject.data,
});
} else if (change.operation === "UPDATE") {
await this.update({
id: change.localObjectId,
data: change.data,
});

const updatedObject = await this.get(change.localObjectId);

await upsertObject({
collection: this.collection,
syncId,
model: this.model,
app: authData.appName ?? undefined,
customApp: authData.customAppName ?? undefined,
managedUserId: authData.managedUser?.externalId ?? null,
localObjectId: updatedObject.id,
localUpdatedAt: updatedObject.updatedAt,
data: updatedObject.data,
});
// } else if (change.operation === "DELETE") {
// await this.delete(change.objectId);
// await deleteObject({
// collection: this.collection,
// model: this.model,
// externalId: change.objectId,
// });
}
}
}

changeCounter += delta.changes.length;
console.info("Changes processed:", changeCounter);
} while (more);
changeCounter += delta.changes.length;
console.info("Changes processed:", changeCounter);
} while (more);
}
}
}

0 comments on commit d359a75

Please sign in to comment.