Skip to content

Commit

Permalink
Processing items signalling set up & unprocessed items fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jhweir committed Jan 22, 2025
1 parent df7c756 commit a454f77
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 68 deletions.
22 changes: 0 additions & 22 deletions app/src/views/community/CommunityView.vue
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,6 @@ import {
import { Channel, Community } from "@coasys/flux-api";
import { useCommunities } from "@coasys/flux-vue";
import { mapActions } from "pinia";
import {
addSynergySignalHandler,
removeSynergySignalHandler,
} from "@coasys/flux-utils";
type LoadedChannels = {
[channelId: string]: boolean;
Expand Down Expand Up @@ -149,24 +145,6 @@ export default defineComponent({
subject: Channel,
});
// add synergy signal handler for each perspective (used to check if agents capable of processing conversation data)
watch(
() => data.value.perspective,
async (newPerspective: PerspectiveProxy | null) => {
if (newPerspective) {
await removeSynergySignalHandler(newPerspective);
await addSynergySignalHandler(newPerspective);
}
},
{ immediate: true }
);
// remove synergy signal handler when component unmounts
onUnmounted(async () => {
if (data.value.perspective)
await removeSynergySignalHandler(data.value.perspective);
});
return {
communities,
channels,
Expand Down
105 changes: 75 additions & 30 deletions packages/utils/src/synergy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,15 +205,14 @@ export async function getDefaultLLM() {
return await client.ai.getDefaultModel("LLM");
}

export async function findUnprocessedItems(perspective: any, items: any[], conversations: any[]) {
const conversationIds = conversations.map((c) => c.baseExpression);
export async function findUnprocessedItems(perspective: any, items: any[], allSubgroupIds: string[]) {
const results = await Promise.all(
items.map(async (item) => {
const links = await perspective.get(
new LinkQuery({ predicate: "ad4m://has_child", target: item.baseExpression })
);
// if the item has a parent link to a conversation we know it has been processed
const isProcessed = links.some((link) => conversationIds.includes(link.data.source));
const isProcessed = links.some((link) => allSubgroupIds.includes(link.data.source));
return isProcessed ? null : item;
})
);
Expand All @@ -238,42 +237,48 @@ async function isMe(did: string) {

let receivedSignals: any[] = [];
let signalHandler: ((expression: PerspectiveExpression) => void) | null = null;
export const itemsBeingProcessed = [] as any[];

async function onSignalReceived(expression: PerspectiveExpression, neighbourhood: NeighbourhoodProxy) {
async function onSignalReceived(
expression: PerspectiveExpression,
neighbourhood: NeighbourhoodProxy,
setProcessing: any
) {
const link = expression.data.links[0];
const { author, data } = link;
const { predicate, target } = data;
const { source, predicate, target } = data;

if (predicate === "can-you-process-items") {
const defaultLLM = await getDefaultLLM();
if (defaultLLM) {
await neighbourhood.sendSignalU(author, { links: [{ source: "", predicate: "i-can-process-items", target }] });
}
// todo: respond if can't process items too?
// await neighbourhood.sendSignalU(author, {
// links: [
// {
// source: "", // channelId (not necissary?)
// predicate: `i-${defaultLLM ? "can" : "cant"}-process-items`,
// target,
// },
// ],
// });
console.log(`Signal recieved: can you process items? (${defaultLLM ? "yes" : "no"})`);
if (defaultLLM)
await neighbourhood.sendSignalU(author, {
links: [{ source: "", predicate: "i-can-process-items", target }],
});
}

if (predicate === "i-can-process-items") {
console.log("Signal recieved: remote agent can process items!");
console.log(`Signal recieved: remote agent ${author} can process items!`);
receivedSignals.push(link);
}

// // is this necissary (might be slightly quicker than waiting for timeout...)
// if (predicate === "i-cant-process-items") {
// }
if (predicate === "processing-items-started") {
const items = JSON.parse(target);
console.log(`Signal recieved: ${items.length} items being processed by ${author}`);
processing = true;
setProcessing({ author, channel: source, items });
}

if (predicate === "processing-items-finished") {
console.log(`Signal recieved: ${author} finished processing items`);
processing = false;
setProcessing(null);
}
}

export async function addSynergySignalHandler(perspective: PerspectiveProxy) {
export async function addSynergySignalHandler(perspective: PerspectiveProxy, setProcessing: any) {
const neighbourhood = await perspective.getNeighbourhoodProxy();
signalHandler = (expression: PerspectiveExpression) => onSignalReceived(expression, neighbourhood);
signalHandler = (expression: PerspectiveExpression) => onSignalReceived(expression, neighbourhood, setProcessing);
neighbourhood.addSignalHandler(signalHandler);
}

Expand Down Expand Up @@ -361,13 +366,28 @@ async function findOrCreateNewConversation(perspective: PerspectiveProxy, channe
// + let other agents know when you have started & finished processing (add new signal in responsibleForProcessing check?)
// + mark individual items as processing in UI
let processing = false;
async function processItemsAndAddToConversation(perspective, channelId, unprocessedItems) {
async function processItemsAndAddToConversation(
perspective,
neighbourhood,
channelId,
unprocessedItems,
setProcessing
) {
// update processing items state
processing = true;
const conversation: any = await findOrCreateNewConversation(perspective, channelId);
const itemIds = JSON.stringify(unprocessedItems.map((item) => item.baseExpression));
setProcessing({ author: "me", channel: channelId, items: itemIds });
// notify other agents that we are processing
await neighbourhood.sendBroadcastU({
links: [{ source: channelId, predicate: "processing-items-started", target: itemIds }],
});
// gather up all new perspective links so they can be commited in a single transaction at the end of the function
const newLinks = [] as any;
// gather up data for LLM processing
const previousSubgroups = await ConversationSubgroup.query(perspective, { source: conversation.baseExpression });
const conversation: any = await findOrCreateNewConversation(perspective, channelId);
const previousSubgroups = await ConversationSubgroup.query(perspective, {
source: conversation.baseExpression,
});
const lastSubgroup = previousSubgroups[previousSubgroups.length - 1] as any;
const lastSubgroupTopics = lastSubgroup ? await findTopics(perspective, lastSubgroup.baseExpression) : [];
const lastSubgroupWithTopics = lastSubgroup ? { ...lastSubgroup, topics: lastSubgroupTopics } : null;
Expand Down Expand Up @@ -478,24 +498,49 @@ async function processItemsAndAddToConversation(perspective, channelId, unproces
if (newSubgroup) await createEmbedding(perspective, newSubgroup.summary, newSubgroupEntity.baseExpression);
// batch commit all new links (currently only "ad4m://has_child" links)
await perspective.addLinks(newLinks);
// update processing items state
processing = false;
setProcessing(null);
// notify other agents
await neighbourhood.sendBroadcastU({
links: [{ source: channelId, predicate: "processing-items-finished", target: "" }],
});
}

export async function findAllChannelSubgroupIds(
perspective: PerspectiveProxy,
conversations: Conversation[]
): Promise<string[]> {
const subgroups = await Promise.all(
conversations.map((conversation) =>
ConversationSubgroup.query(perspective, { source: conversation.baseExpression })
)
);
return [...new Set(subgroups.flat().map((subgroup) => subgroup.baseExpression))];
}

export async function runProcessingCheck(perspective: PerspectiveProxy, channelId: string) {
export async function runProcessingCheck(perspective: PerspectiveProxy, channelId: string, setProcessing: any) {
console.log("runProcessingCheck");
// only attempt processing if default LLM is set
if (!(await getDefaultLLM())) return;

// check if we are responsible for processing
const channelItems = await getSynergyItems(perspective, channelId);
const conversations = (await Conversation.query(perspective, { source: channelId })) as any;
const unprocessedItems = await findUnprocessedItems(perspective, channelItems, conversations);
const allSubgroupIds = await findAllChannelSubgroupIds(perspective, conversations);
const unprocessedItems = await findUnprocessedItems(perspective, channelItems, allSubgroupIds);
const neighbourhood = await perspective.getNeighbourhoodProxy();
const responsible: boolean = await responsibleForProcessing(perspective, neighbourhood, channelId, unprocessedItems);
console.log("responsible for processing", responsible);
// if we are responsible, process items (minus delay) & add to conversation
if (responsible && !processing)
await processItemsAndAddToConversation(perspective, channelId, unprocessedItems.slice(0, -numberOfItemsDelay));
await processItemsAndAddToConversation(
perspective,
neighbourhood,
channelId,
unprocessedItems.slice(0, -numberOfItemsDelay),
setProcessing
);
}

export async function startTranscription(callback: (text) => void) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import { useEffect, useState, useRef } from "preact/hooks";
import { closeMenu, getConversationData, groupingOptions } from "../../utils";
import { runProcessingCheck, getSynergyItems } from "@coasys/flux-utils";
import {
runProcessingCheck,
getSynergyItems,
itemsBeingProcessed,
addSynergySignalHandler,
} from "@coasys/flux-utils";
import TimelineBlock from "../TimelineBlock";
import styles from "./TimelineColumn.module.scss";
import { getAd4mClient } from "@coasys/ad4m-connect/utils";
import Avatar from "../Avatar";

type Props = {
Expand All @@ -23,17 +27,16 @@ export default function TimelineColumn({
}: Props) {
const [conversations, setConversations] = useState<any[]>([]);
const [unprocessedItems, setUnprocessedItems] = useState<any[]>([]);
const [processing, setProcessing] = useState(false);
const [processing, setProcessing] = useState<any>(null);
const [selectedItemId, setSelectedItemId] = useState<any>(null);
const [zoom, setZoom] = useState(groupingOptions[0]);
const [usingLLM, setUsingLLM] = useState(false);
const timeout = useRef<any>(null);
const totalConversationItems = useRef(0);

async function runProcessingCheckIfNewItems() {
const conversationItems = await getSynergyItems(perspective, channelId);
if (conversationItems.length > totalConversationItems.current)
runProcessingCheck(perspective, channelId);
runProcessingCheck(perspective, channelId, setProcessing);
totalConversationItems.current = conversationItems.length;
}

Expand All @@ -43,26 +46,25 @@ export default function TimelineColumn({
setUnprocessedItems(data.unprocessedItems);
setConversations(data.conversations);
}

async function checkIfUsingLLM() {
const client = await getAd4mClient();
const defaultLLM = await client.ai.getDefaultModel("LLM");
setUsingLLM(!!defaultLLM);
}

function linkAddedListener() {
if (timeout.current) clearTimeout(timeout.current);
timeout.current = setTimeout(getData, 2000);
}

useEffect(() => {
// add signal listener
addSynergySignalHandler(perspective, setProcessing);
// add listener for new links
perspective.addListener("link-added", linkAddedListener);
checkIfUsingLLM();
getData();

return () => perspective.removeListener("link-added", linkAddedListener);
}, []);

useEffect(() => {
setProcessing(itemsBeingProcessed.find((item) => item.channelId === channelId) || null);
}, [itemsBeingProcessed]);

return (
<div className={styles.wrapper}>
<j-flex a="center" j="between" className={styles.header}>
Expand Down Expand Up @@ -110,6 +112,12 @@ export default function TimelineColumn({
<j-text uppercase size="400" weight="800" color="primary-500">
Unprocessed Items
</j-text>
{processing && (
<j-flex a="center">
<j-text nomargin>{processing.items.length} items being processed by</j-text>
<Avatar did={processing.author} showName />
</j-flex>
)}
{unprocessedItems.map((item) => (
<j-flex gap="400" a="center" className={styles.itemCard}>
<j-flex gap="300" direction="column">
Expand All @@ -118,6 +126,9 @@ export default function TimelineColumn({
<j-flex gap="400" a="center" wrap>
<Avatar did={item.author} showName />
</j-flex>
{processing && processing.items.includes(item.baseExpression) && (
<j-badge variant="success">Processing...</j-badge>
)}
</j-flex>
<j-text
nomargin
Expand Down
12 changes: 9 additions & 3 deletions views/synergy-demo-view/src/utils/index.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { Conversation, ConversationSubgroup, SemanticRelationship } from "@coasys/flux-api";
import { findTopics, getSynergyItems, findUnprocessedItems } from "@coasys/flux-utils";
import { Conversation, ConversationSubgroup } from "@coasys/flux-api";
import {
findTopics,
getSynergyItems,
findUnprocessedItems,
findAllChannelSubgroupIds,
} from "@coasys/flux-utils";
import { LinkQuery } from "@coasys/ad4m";

// constants
Expand All @@ -17,7 +22,8 @@ export async function getConversationData(perspective, channelId, match?, setMat
// gather up unprocessed items
const channelItems = await getSynergyItems(perspective, channelId);
const conversations = (await Conversation.query(perspective, { source: channelId })) as any;
const unprocessedItems = await findUnprocessedItems(perspective, channelItems, conversations);
const allSubgroupIds = await findAllChannelSubgroupIds(perspective, conversations);
const unprocessedItems = await findUnprocessedItems(perspective, channelItems, allSubgroupIds);
// find & attach timestamp to converations
const conversationsWithTimestamps = await Promise.all(
conversations.map(
Expand Down

0 comments on commit a454f77

Please sign in to comment.