-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Clean up code and improve thread processing workflow #1797
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -65,7 +65,7 @@ type MainWorkflowError = | |
|
|
||
| const validateArguments = ( | ||
| params: MainWorkflowParams, | ||
| serviceAccount: any, | ||
| serviceAccount: { project_id: string }, | ||
| ): Effect.Effect<string, MainWorkflowError> => | ||
| Effect.gen(function* () { | ||
| yield* Console.log('[MAIN_WORKFLOW] Validating arguments'); | ||
|
|
@@ -87,6 +87,12 @@ const validateArguments = ( | |
|
|
||
| const override = false; | ||
|
|
||
| /** | ||
| * This function runs the main workflow. The main workflow is responsible for processing incoming messages from a Pub/Sub subscription and passing them to the appropriate pipeline. | ||
| * It validates the subscription name and extracts the connection ID. | ||
| * @param params | ||
| * @returns | ||
| */ | ||
| export const runMainWorkflow = ( | ||
| params: MainWorkflowParams, | ||
| ): Effect.Effect<string, MainWorkflowError> => | ||
|
|
@@ -265,36 +271,82 @@ export const runZeroWorkflow = ( | |
| }); | ||
|
|
||
| // Extract thread IDs from history | ||
| const threadIds = new Set<string>(); | ||
| const threadsChanged = new Set<string>(); | ||
| const threadsAdded = new Set<string>(); | ||
| history.forEach((historyItem) => { | ||
| if (historyItem.messagesAdded) { | ||
| historyItem.messagesAdded.forEach((messageAdded) => { | ||
| if (messageAdded.message?.threadId) { | ||
| threadIds.add(messageAdded.message.threadId); | ||
| threadsChanged.add(messageAdded.message.threadId); | ||
| threadsAdded.add(messageAdded.message.threadId); | ||
| } | ||
| }); | ||
| } | ||
| if (historyItem.labelsAdded) { | ||
| historyItem.labelsAdded.forEach((labelAdded) => { | ||
| if (labelAdded.message?.threadId) { | ||
| threadIds.add(labelAdded.message.threadId); | ||
| threadsChanged.add(labelAdded.message.threadId); | ||
| } | ||
| }); | ||
| } | ||
| if (historyItem.labelsRemoved) { | ||
| historyItem.labelsRemoved.forEach((labelRemoved) => { | ||
| if (labelRemoved.message?.threadId) { | ||
| threadIds.add(labelRemoved.message.threadId); | ||
| threadsChanged.add(labelRemoved.message.threadId); | ||
| } | ||
| }); | ||
| } | ||
| }); | ||
|
|
||
| yield* Console.log('[ZERO_WORKFLOW] Found unique thread IDs:', Array.from(threadIds)); | ||
| yield* Console.log( | ||
| '[ZERO_WORKFLOW] Found unique thread IDs:', | ||
| Array.from(threadsChanged), | ||
MrgSub marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Array.from(threadsAdded), | ||
| ); | ||
|
|
||
| if (threadsAdded.size > 0) { | ||
| const threadWorkflowParams = Array.from(threadsAdded); | ||
|
|
||
| // Sync threads with proper error handling - use allSuccesses to collect successful syncs | ||
| const syncResults = yield* Effect.allSuccesses( | ||
| threadWorkflowParams.map((threadId) => | ||
| Effect.tryPromise({ | ||
| try: async () => { | ||
| const result = await agent.syncThread({ threadId }); | ||
| console.log(`[ZERO_WORKFLOW] Successfully synced thread ${threadId}`); | ||
| return { threadId, result }; | ||
| }, | ||
| catch: (error) => { | ||
| console.error(`[ZERO_WORKFLOW] Failed to sync thread ${threadId}:`, error); | ||
| // Let this effect fail so allSuccesses will exclude it | ||
| throw new Error( | ||
| `Failed to sync thread ${threadId}: ${error instanceof Error ? error.message : String(error)}`, | ||
| ); | ||
| }, | ||
| }), | ||
| ), | ||
| { concurrency: 1 }, // Limit concurrency to avoid rate limits | ||
| ); | ||
|
|
||
| const syncedCount = syncResults.length; | ||
| const failedCount = threadWorkflowParams.length - syncedCount; | ||
|
|
||
| if (failedCount > 0) { | ||
| yield* Console.log( | ||
| `[ZERO_WORKFLOW] Warning: ${failedCount}/${threadWorkflowParams.length} thread syncs failed. Successfully synced: ${syncedCount}`, | ||
| ); | ||
| // Continue with processing - sync failures shouldn't stop the entire workflow | ||
| // The thread processing will continue with whatever data is available | ||
| } else { | ||
| yield* Console.log(`[ZERO_WORKFLOW] Successfully synced all ${syncedCount} threads`); | ||
| } | ||
|
|
||
| yield* Console.log('[ZERO_WORKFLOW] Synced threads:', syncResults); | ||
| } | ||
|
|
||
| // Process all threads concurrently using Effect.all | ||
| if (threadIds.size > 0) { | ||
| const threadWorkflowParams = Array.from(threadIds).map((threadId) => ({ | ||
| if (threadsChanged.size > 0) { | ||
| const threadWorkflowParams = Array.from(threadsChanged).map((threadId) => ({ | ||
| connectionId, | ||
| threadId, | ||
| providerId: foundConnection.providerId, | ||
|
|
@@ -339,7 +391,7 @@ export const runZeroWorkflow = ( | |
| ); | ||
| }), | ||
| ), | ||
| { concurrency: 1 }, // Process up to 5 threads concurrently | ||
| { concurrency: 1, discard: true }, // Process up to 5 threads concurrently | ||
| ); | ||
|
|
||
| yield* Console.log('[ZERO_WORKFLOW] All thread workflows completed:', threadResults.length); | ||
|
|
@@ -407,6 +459,11 @@ type ThreadWorkflowError = | |
| | { _tag: 'GmailApiError'; error: unknown } | ||
| | { _tag: 'VectorizationError'; error: unknown }; | ||
|
|
||
| /** | ||
| * Runs the main workflow for processing a thread. The workflow is responsible for processing incoming messages from a Pub/Sub subscription and passing them to the appropriate pipeline. | ||
| * @param params | ||
| * @returns | ||
| */ | ||
| export const runThreadWorkflow = ( | ||
| params: ThreadWorkflowParams, | ||
| ): Effect.Effect<string, ThreadWorkflowError> => | ||
|
|
@@ -613,12 +670,9 @@ export const runThreadWorkflow = ( | |
| { role: 'system', content: SummarizeMessagePrompt }, | ||
| { role: 'user', content: prompt }, | ||
| ]; | ||
| const response: any = await env.AI.run( | ||
| '@cf/meta/llama-4-scout-17b-16e-instruct', | ||
| { | ||
| messages, | ||
| }, | ||
| ); | ||
| const response = await env.AI.run('@cf/meta/llama-4-scout-17b-16e-instruct', { | ||
| messages, | ||
| }); | ||
| console.log( | ||
| `[THREAD_WORKFLOW] Summary generated for message ${message.id}:`, | ||
| response, | ||
|
|
@@ -696,7 +750,7 @@ export const runThreadWorkflow = ( | |
| return null; | ||
| } | ||
| console.log('[THREAD_WORKFLOW] Found existing thread summary'); | ||
| return threadSummary[0].metadata as any; | ||
| return threadSummary[0].metadata as { summary: string; lastMsg: string }; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider defining an interface for the metadata structure instead of using a type assertion. This would improve type safety and align with the Google TypeScript Style Guide's preference for type annotations over assertions: interface ThreadSummaryMetadata {
summary: string;
lastMsg: string;
}
// Then use it like:
return threadSummary[0].metadata as ThreadSummaryMetadata;Even better would be to properly type the database response so no assertion is needed at all. Spotted by Diamond (based on custom rules) |
||
| }, | ||
| catch: (error) => ({ _tag: 'VectorizationError' as const, error }), | ||
| }); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.