Skip to content

Commit

Permalink
Initial work on new processItem function
Browse files Browse the repository at this point in the history
  • Loading branch information
jhweir committed Jan 15, 2025
1 parent 7dd82c5 commit 7c1a52c
Showing 1 changed file with 102 additions and 28 deletions.
130 changes: 102 additions & 28 deletions packages/utils/src/synergy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ import {
import JSON5 from "json5";

async function removeEmbedding(perspective, itemId) {
const allRelationships = (await SemanticRelationship.query(perspective, {
const allSemanticRelationships = (await SemanticRelationship.query(perspective, {
source: itemId,
})) as any;
const embeddingRelationship = allRelationships.find((r) => !r.relevance);
if (embeddingRelationship) {
const embedding = new Embedding(perspective, embeddingRelationship.tag);
await embedding.delete();
await embeddingRelationship.delete();
const embeddingSR = allSemanticRelationships.find((sr) => !sr.relevance);
if (embeddingSR) {
const embedding = new Embedding(perspective, embeddingSR.tag);
await embedding.delete(); // delete the embedding
await embeddingSR.delete(); // delete the semantic relationship
}
}

Expand All @@ -48,10 +48,7 @@ async function removeTopics(perspective, itemId) {
}

async function removeProcessedData(perspective, itemId) {
return await Promise.all([
removeEmbedding(perspective, itemId),
removeTopics(perspective, itemId),
]);
return await Promise.all([removeEmbedding(perspective, itemId), removeTopics(perspective, itemId)]);
}

// todo: use embedding language instead of stringifying
Expand Down Expand Up @@ -276,13 +273,102 @@ export async function getSubgroupItems(perspective, subgroupId) {
});
}

async function removeProcessedDataIfPresent(perspective, itemId) {
// check if there are existing semantic relationships on the item & remove if present (used for edits & unprocessed items)
const relationships = await SemanticRelationship.query(perspective, { source: itemId });
if (relationships.length) await removeProcessedData(perspective, itemId);
}

// todo: use raw prolog query here so subject classes don't need to be hard coded
async function getConversationItems(perspective, conversationId) {
// gather up all the items in the conversation
const messages = await new SubjectRepository(Message, { perspective, source: conversationId }).getAllData();
const posts = await new SubjectRepository(Post, { perspective, source: conversationId }).getAllData();
const tasks = await new SubjectRepository("Task", { perspective, source: conversationId }).getAllData();
// transform items into common format
const transformedItems = [
...messages.map((message) => transformItem("Message", message)),
...posts.map((post) => transformItem("Post", post)),
...tasks.map((task) => transformItem("Task", task)),
];
// order items by timestamp
const orderedItems = transformedItems.sort((a, b) => {
return new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime();
});
return orderedItems;
}

async function linkItemToConversation(perspective, channelId, itemId) {
let conversation;
const conversations = await Conversation.query(perspective, { source: channelId });
if (conversations.length) {
// if existing conversations found & last item in last conversation less than 30 mins old, use that conversation
const lastConversation = conversations[conversations.length - 1];
const conversationItems = await getConversationItems(perspective, lastConversation.baseExpression);
if (conversationItems.length) {
const lastItem = conversationItems[conversationItems.length - 1];
const timeSinceLastItemCreated = new Date().getTime() - new Date(lastItem.timestamp).getTime();
const minsSinceLastItemCreated = timeSinceLastItemCreated / (1000 * 60);
if (minsSinceLastItemCreated < 30) conversation = lastConversation;
}
}
if (!conversation) {
// create a new conversation
const newConversation = new Conversation(perspective, undefined, channelId);
newConversation.conversationName = `Conversation ${conversations.length + 1}`;
await newConversation.save();
conversation = await newConversation.get();
}
// link item to conversation
await perspective.add({
source: conversation.baseExpression,
predicate: "ad4m://has_child",
target: itemId,
});
}

async function getDefaultLLM() {
const client = await getAd4mClient();
return await client.ai.getDefaultModel("LLM");
}

const processingQueue = [] as any[];
let isProcessing = false;

// async function maybeProcessNext() {
// if (isProcessing || !processingQueue.length) return;
// isProcessing = true;

// const next = processingQueue.shift();
// try {
// await processItemOld(next.perspective, next.channelId, next.item, next.existingItem);
// } finally {
// isProcessing = false;
// // Attempt to process the next in queue
// maybeProcessNext();
// }
// }

export async function processItem(perspective, channelId, item, existingItem?: boolean) {
return new Promise(async (resolve: any) => {
await removeProcessedDataIfPresent(perspective, item.baseExpression);
await linkItemToConversation(perspective, channelId, item.baseExpression);
// if no default LLM, end processing here
if (!(await getDefaultLLM())) resolve();
else {
await createEmbedding(perspective, item.text, item.baseExpression);
// check if responsible for LLM task...
}
});
}

export async function processItemOld(perspective, channelId, item, existingItem?: boolean) {
// check if LLM is enabled
const client = await getAd4mClient();
const defaultLLM = await client.ai.getDefaultModel("LLM");
console.log("process item with LLM: ", !!defaultLLM);
return new Promise(async (resolve: any) => {
// check for existing relationships & removed processed data if present (used for edits & unprocessed items)
// check for existing relationships & remove processed data if present (used for edits & unprocessed items)
const relationships = await SemanticRelationship.query(perspective, {
source: item.baseExpression,
});
Expand All @@ -296,6 +382,7 @@ export async function processItem(perspective, channelId, item, existingItem?: b
});
if (conversations.length) {
if (existingItem) {
// todo: use raw prolog query here
// find the conversation & subgroup of the existing item
const allConversations = await Conversation.all(perspective);
const allSubgroups = await ConversationSubgroup.all(perspective);
Expand All @@ -305,9 +392,7 @@ export async function processItem(perspective, channelId, item, existingItem?: b
target: item.baseExpression,
})
);
const subgroup = allSubgroups.find((s) =>
itemLinks.find((link) => link.data.source === s.baseExpression)
);
const subgroup = allSubgroups.find((s) => itemLinks.find((link) => link.data.source === s.baseExpression));
const subgroupLinks = await perspective.get(
new LinkQuery({
predicate: "ad4m://has_child",
Expand All @@ -325,10 +410,7 @@ export async function processItem(perspective, channelId, item, existingItem?: b
source: latestConversation.baseExpression,
});
const latestSubgroup = subgroups[subgroups.length - 1] as any;
const latestSubgroupItems = await getSubgroupItems(
perspective,
latestSubgroup.baseExpression
);
const latestSubgroupItems = await getSubgroupItems(perspective, latestSubgroup.baseExpression);
// calculate time since last item was created
const lastItem = latestSubgroupItems[latestSubgroupItems.length - 1];
if (lastItem) {
Expand All @@ -349,11 +431,7 @@ export async function processItem(perspective, channelId, item, existingItem?: b
await newConversation.save();
conversation = await newConversation.get();
// create new subgroup
const newSubgroup = new ConversationSubgroup(
perspective,
undefined,
conversation.baseExpression
);
const newSubgroup = new ConversationSubgroup(perspective, undefined, conversation.baseExpression);
newSubgroup.subgroupName = "Subgroup 1";
await newSubgroup.save();
subgroups = [newSubgroup];
Expand Down Expand Up @@ -397,11 +475,7 @@ export async function processItem(perspective, channelId, item, existingItem?: b
// update subgroup summary and title
if (changedSubject && subgroup.summary) {
// if the conversation has shifted to a new subject, create a new subgroup
const newSubgroup = new ConversationSubgroup(
perspective,
undefined,
conversation.baseExpression
);
const newSubgroup = new ConversationSubgroup(perspective, undefined, conversation.baseExpression);
newSubgroup.subgroupName = newSubgroupName;
newSubgroup.summary = newSubgroupSummary;
await newSubgroup.save();
Expand Down

0 comments on commit 7c1a52c

Please sign in to comment.