Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

👌 IMPROVE: Memory deployment #138

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion packages/baseai/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,4 @@
"langbase.com",
"generative AI"
]
}
}
38 changes: 22 additions & 16 deletions packages/baseai/src/deploy/document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
handleError,
handleInvalidConfig,
listMemoryDocuments,
uploadDocumentsToMemory,
uploadDocumentsToMemory
} from '.';
import path from 'path';
import fs from 'fs/promises';
Expand All @@ -19,7 +19,10 @@ import {
} from '@/utils/memory/load-memory-files';
import type { MemoryI } from 'types/memory';
import { compareDocumentLists } from '@/utils/memory/compare-docs-list';
import { retrieveAuthentication, type Account } from '@/utils/retrieve-credentials';
import {
retrieveAuthentication,
type Account
} from '@/utils/retrieve-credentials';

type Spinner = ReturnType<typeof p.spinner>;

Expand Down Expand Up @@ -114,11 +117,18 @@ async function deployDocument({
process.exit(1);
}

// Fetch the existing documents
const prodDocs = await listMemoryDocuments({
account,
memoryName
});

await handleSingleDocDeploy({
memory: memoryObject,
account,
document,
overwrite
overwrite,
prodDocs
});

spinner.stop(
Expand All @@ -139,33 +149,29 @@ export async function handleSingleDocDeploy({
memory,
account,
document,
overwrite
overwrite,
prodDocs
}: {
memory: MemoryI;
account: Account;
document: MemoryDocumentI;
overwrite: boolean;
prodDocs: string[];
}) {
p.log.info(
`Checking "${memory.name}" memory for document "${document.name}".`
);

// Fetch the existing documents
const prodDocs = await listMemoryDocuments({
account,
memoryName: memory.name
});

// If overwrite is present, deploy.
if (overwrite) {
await uploadDocumentsToMemory({
account,
documents: [document],
name: memory.name
});
p.log.success(
`Document "${document.name}" uploaded to memory "${memory.name}".`
);
// p.log.success(
// `Document "${document.name}" uploaded to memory "${memory.name}".`
// );
return;
}

Expand All @@ -185,9 +191,9 @@ export async function handleSingleDocDeploy({
documents: [document],
name: memory.name
});
p.log.success(
`Document "${document.name}" uploaded to memory "${memory.name}".`
);
// p.log.success(
// `Document "${document.name}" uploaded to memory "${memory.name}".`
// );
return;
}

Expand Down
138 changes: 94 additions & 44 deletions packages/baseai/src/deploy/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -577,12 +577,6 @@ export async function upsertMemory({
p.log.info(
`Memory "${memory.name}" already exists. Updating changed documents.`
);
await handleGitSyncMemoryDeploy({
memory,
account,
documents,
overwrite
});

if (docsToDelete?.length > 0) {
await deleteDocumentsFromMemory({
Expand All @@ -592,6 +586,13 @@ export async function upsertMemory({
});
}

await handleGitSyncMemoryDeploy({
memory,
account,
documents,
overwrite
});

await updateDeployedCommitHash(memory.name);

p.log.info(
Expand Down Expand Up @@ -643,24 +644,43 @@ export async function uploadDocumentsToMemory({
name: string;
account: Account;
}) {
for (const doc of documents) {
try {
p.log.message(`Uploading document: ${doc.name} ....`);
await new Promise(resolve => setTimeout(resolve, 800)); // To avoid rate limiting
const signedUrl = await getSignedUploadUrl({
documentName: doc.name,
memoryName: name,
account,
meta: doc.meta
});
const BATCH_SIZE = 5; // Number of concurrent uploads
const RATE_LIMIT_DELAY = 1500; // 1.5 second delay between requests

// Process documents in batches to avoid rate limiting
for (let i = 0; i < documents.length; i += BATCH_SIZE) {
const batch = documents.slice(i, i + BATCH_SIZE);

const batchUploadPromises = batch.map(async (doc, index) => {
try {
// Stagger requests within batch
await new Promise(resolve =>
setTimeout(resolve, index * RATE_LIMIT_DELAY)
);

const uploadResponse = await uploadDocument(signedUrl, doc.blob);
dlog(`Upload response status: ${uploadResponse.status}`);
// p.log.message(`Uploading document: ${doc.name} ....`);
const signedUrl = await getSignedUploadUrl({
documentName: doc.name,
memoryName: name,
account,
meta: doc.meta
});

p.log.message(`Uploaded document: ${doc.name}`);
} catch (error) {
throw error;
}
const uploadResponse = await uploadDocument(
signedUrl,
doc.blob
);
dlog(`Upload response status: ${uploadResponse.status}`);

p.log.message(`Uploaded document: ${doc.name}`);
} catch (error: any) {
throw new Error(
`Failed to upload ${doc.name}: ${error.message ?? error}`
);
}
});

await Promise.all(batchUploadPromises);
}
}

Expand All @@ -673,25 +693,37 @@ export async function deleteDocumentsFromMemory({
name: string;
account: Account;
}) {
p.log.info(`Deleting documents from memory: ${name}`);
const BATCH_SIZE = 5; // Number of concurrent uploads
const RATE_LIMIT_DELAY = 1500; // 1.5 second delay between requests

for (const doc of documents) {
try {
p.log.message(`Deleting document: ${doc} ....`);
await new Promise(resolve => setTimeout(resolve, 800)); // To avoid rate limiting
p.log.info(`Deleting ${documents.length} documents from memory: ${name}`);

const deleteResponse = await deleteDocument({
documentName: doc,
memoryName: name,
account
});
for (let i = 0; i < documents.length; i += BATCH_SIZE) {
const batch = documents.slice(i, i + BATCH_SIZE);
const batchPromises = batch.map(async (doc, index) => {
try {
await new Promise(resolve =>
setTimeout(resolve, index * RATE_LIMIT_DELAY)
);

dlog(`Delete response status: ${deleteResponse.status}`);
// p.log.message(`Deleting document: ${doc}`);
const deleteResponse = await deleteDocument({
documentName: doc,
memoryName: name,
account
});

p.log.message(`Deleted document: ${doc}`);
} catch (error) {
throw error;
}
dlog(`Delete response status: ${deleteResponse.status}`);
p.log.message(`Deleted document: ${doc}`);
return deleteResponse;
} catch (error: any) {
throw new Error(
`Failed to delete ${doc}: ${error.message ?? error}`
);
}
});

await Promise.all(batchPromises);
}
p.log.info(`Deleted documents from memory: ${name}`);
}
Expand Down Expand Up @@ -1091,14 +1123,32 @@ export async function handleGitSyncMemoryDeploy({
documents: MemoryDocumentI[];
overwrite: boolean;
}) {
for (const doc in documents) {
await new Promise(resolve => setTimeout(resolve, 800)); // To avoid rate limiting
await handleSingleDocDeploy({
memory,
account,
document: documents[doc],
overwrite: true // TODO: Implement overwrite for git-sync memories
const BATCH_SIZE = 5;
const RATE_LIMIT_DELAY = 1500;

// Fetch existing documents once
const prodDocs = await listMemoryDocuments({
account,
memoryName: memory.name
});

// Process in batches
for (let i = 0; i < documents.length; i += BATCH_SIZE) {
const batch = documents.slice(i, i + BATCH_SIZE);
const batchPromises = batch.map(async (doc, index) => {
await new Promise(resolve =>
setTimeout(resolve, index * RATE_LIMIT_DELAY)
);
return handleSingleDocDeploy({
memory,
account,
document: doc,
overwrite: true,
prodDocs
});
});

await Promise.all(batchPromises);
}
}

Expand Down
2 changes: 0 additions & 2 deletions packages/baseai/src/utils/memory/load-memory-files.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ export const loadMemoryFilesFromCustomDir = async ({
process.exit(1);
}

console.log('Reading documents in memory...');

// Get all files that match the glob patterns and are tracked by git
let allFiles: string[];
try {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,4 @@
"langbase.com",
"generative AI"
]
}
}