diff --git a/packages/utils/src/synergy.ts b/packages/utils/src/synergy.ts index e7dc44547..c75e4a11b 100644 --- a/packages/utils/src/synergy.ts +++ b/packages/utils/src/synergy.ts @@ -14,14 +14,14 @@ import { import JSON5 from "json5"; async function removeEmbedding(perspective, itemId) { - const allRelationships = (await SemanticRelationship.query(perspective, { + const allSemanticRelationships = (await SemanticRelationship.query(perspective, { source: itemId, })) as any; - const embeddingRelationship = allRelationships.find((r) => !r.relevance); - if (embeddingRelationship) { - const embedding = new Embedding(perspective, embeddingRelationship.tag); - await embedding.delete(); - await embeddingRelationship.delete(); + const embeddingSR = allSemanticRelationships.find((sr) => !sr.relevance); + if (embeddingSR) { + const embedding = new Embedding(perspective, embeddingSR.tag); + await embedding.delete(); // delete the embedding + await embeddingSR.delete(); // delete the semantic relationship } } @@ -48,10 +48,7 @@ async function removeTopics(perspective, itemId) { } async function removeProcessedData(perspective, itemId) { - return await Promise.all([ - removeEmbedding(perspective, itemId), - removeTopics(perspective, itemId), - ]); + return await Promise.all([removeEmbedding(perspective, itemId), removeTopics(perspective, itemId)]); } // todo: use embedding language instead of stringifying @@ -276,13 +273,102 @@ export async function getSubgroupItems(perspective, subgroupId) { }); } +async function removeProcessedDataIfPresent(perspective, itemId) { + // check if there are existing semantic relationships on the item & remove if present (used for edits & unprocessed items) + const relationships = await SemanticRelationship.query(perspective, { source: itemId }); + if (relationships.length) await removeProcessedData(perspective, itemId); +} + +// todo: use raw prolog query here so subject classes don't need to be hard coded +async function getConversationItems(perspective, conversationId) { + // gather up all the items in the conversation + const messages = await new SubjectRepository(Message, { perspective, source: conversationId }).getAllData(); + const posts = await new SubjectRepository(Post, { perspective, source: conversationId }).getAllData(); + const tasks = await new SubjectRepository("Task", { perspective, source: conversationId }).getAllData(); + // transform items into common format + const transformedItems = [ + ...messages.map((message) => transformItem("Message", message)), + ...posts.map((post) => transformItem("Post", post)), + ...tasks.map((task) => transformItem("Task", task)), + ]; + // order items by timestamp + const orderedItems = transformedItems.sort((a, b) => { + return new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime(); + }); + return orderedItems; +} + +async function linkItemToConversation(perspective, channelId, itemId) { + let conversation; + const conversations = await Conversation.query(perspective, { source: channelId }); + if (conversations.length) { + // if existing conversations found & last item in last conversation less than 30 mins old, use that conversation + const lastConversation = conversations[conversations.length - 1]; + const conversationItems = await getConversationItems(perspective, lastConversation.baseExpression); + if (conversationItems.length) { + const lastItem = conversationItems[conversationItems.length - 1]; + const timeSinceLastItemCreated = new Date().getTime() - new Date(lastItem.timestamp).getTime(); + const minsSinceLastItemCreated = timeSinceLastItemCreated / (1000 * 60); + if (minsSinceLastItemCreated < 30) conversation = lastConversation; + } + } + if (!conversation) { + // create a new conversation + const newConversation = new Conversation(perspective, undefined, channelId); + newConversation.conversationName = `Conversation ${conversations.length + 1}`; + await newConversation.save(); + conversation = await newConversation.get(); + } + // link item to conversation + await perspective.add({ + source: conversation.baseExpression, + predicate: "ad4m://has_child", + target: itemId, + }); +} + +async function getDefaultLLM() { + const client = await getAd4mClient(); + return await client.ai.getDefaultModel("LLM"); +} + +const processingQueue = [] as any[]; +let isProcessing = false; + +// async function maybeProcessNext() { +// if (isProcessing || !processingQueue.length) return; +// isProcessing = true; + +// const next = processingQueue.shift(); +// try { +// await processItemOld(next.perspective, next.channelId, next.item, next.existingItem); +// } finally { +// isProcessing = false; +// // Attempt to process the next in queue +// maybeProcessNext(); +// } +// } + export async function processItem(perspective, channelId, item, existingItem?: boolean) { + return new Promise(async (resolve: any) => { + await removeProcessedDataIfPresent(perspective, item.baseExpression); + await linkItemToConversation(perspective, channelId, item.baseExpression); + // if no default LLM, end processing here + if (!(await getDefaultLLM())) resolve(); + else { + await createEmbedding(perspective, item.text, item.baseExpression); + // check if responsible for LLM task... + } + }); +} + +export async function processItemOld(perspective, channelId, item, existingItem?: boolean) { // check if LLM is enabled const client = await getAd4mClient(); const defaultLLM = await client.ai.getDefaultModel("LLM"); console.log("process item with LLM: ", !!defaultLLM); return new Promise(async (resolve: any) => { - // check for existing relationships & removed processed data if present (used for edits & unprocessed items) + // check for existing relationships & remove processed data if present (used for edits & unprocessed items) const relationships = await SemanticRelationship.query(perspective, { source: item.baseExpression, }); @@ -296,6 +382,7 @@ export async function processItem(perspective, channelId, item, existingItem?: b }); if (conversations.length) { if (existingItem) { + // todo: use raw prolog query here // find the conversation & subgroup of the existing item const allConversations = await Conversation.all(perspective); const allSubgroups = await ConversationSubgroup.all(perspective); @@ -305,9 +392,7 @@ export async function processItem(perspective, channelId, item, existingItem?: b target: item.baseExpression, }) ); - const subgroup = allSubgroups.find((s) => - itemLinks.find((link) => link.data.source === s.baseExpression) - ); + const subgroup = allSubgroups.find((s) => itemLinks.find((link) => link.data.source === s.baseExpression)); const subgroupLinks = await perspective.get( new LinkQuery({ predicate: "ad4m://has_child", @@ -325,10 +410,7 @@ export async function processItem(perspective, channelId, item, existingItem?: b source: latestConversation.baseExpression, }); const latestSubgroup = subgroups[subgroups.length - 1] as any; - const latestSubgroupItems = await getSubgroupItems( - perspective, - latestSubgroup.baseExpression - ); + const latestSubgroupItems = await getSubgroupItems(perspective, latestSubgroup.baseExpression); // calculate time since last item was created const lastItem = latestSubgroupItems[latestSubgroupItems.length - 1]; if (lastItem) { @@ -349,11 +431,7 @@ export async function processItem(perspective, channelId, item, existingItem?: b await newConversation.save(); conversation = await newConversation.get(); // create new subgroup - const newSubgroup = new ConversationSubgroup( - perspective, - undefined, - conversation.baseExpression - ); + const newSubgroup = new ConversationSubgroup(perspective, undefined, conversation.baseExpression); newSubgroup.subgroupName = "Subgroup 1"; await newSubgroup.save(); subgroups = [newSubgroup]; @@ -397,11 +475,7 @@ export async function processItem(perspective, channelId, item, existingItem?: b // update subgroup summary and title if (changedSubject && subgroup.summary) { // if the conversation has shifted to a new subject, create a new subgroup - const newSubgroup = new ConversationSubgroup( - perspective, - undefined, - conversation.baseExpression - ); + const newSubgroup = new ConversationSubgroup(perspective, undefined, conversation.baseExpression); newSubgroup.subgroupName = newSubgroupName; newSubgroup.summary = newSubgroupSummary; await newSubgroup.save();