diff --git a/hub/src/sync/messageService.ts b/hub/src/sync/messageService.ts index c5bfd3163..02cd521a0 100644 --- a/hub/src/sync/messageService.ts +++ b/hub/src/sync/messageService.ts @@ -1,8 +1,49 @@ -import type { AttachmentMetadata, DecryptedMessage } from '@hapi/protocol/types' +import type { AgentState, AgentStateCompletedRequest, AgentStateRequest, AttachmentMetadata, DecryptedMessage } from '@hapi/protocol/types' import type { Server } from 'socket.io' import type { Store } from '../store' import { EventPublisher } from './eventPublisher' +export type FilteredPermissions = { + requests: Record + completedRequests: Record +} + +/** + * Filter permissions by time range based on messages. + * Only returns permissions that were created within or after the oldest message's time. + */ +export function filterPermissionsByTimeRange( + agentState: AgentState | null | undefined, + messages: DecryptedMessage[] +): FilteredPermissions { + if (!agentState || messages.length === 0) { + return { requests: {}, completedRequests: {} } + } + + const oldestTime = Math.min(...messages.map(m => m.createdAt)) + + const requests: Record = {} + const completedRequests: Record = {} + + // Filter pending requests + for (const [id, req] of Object.entries(agentState.requests ?? {})) { + const createdAt = req.createdAt ?? Date.now() + if (createdAt >= oldestTime) { + requests[id] = req + } + } + + // Filter completed requests + for (const [id, req] of Object.entries(agentState.completedRequests ?? {})) { + const createdAt = req.createdAt ?? 0 + if (createdAt >= oldestTime) { + completedRequests[id] = req + } + } + + return { requests, completedRequests } +} + export class MessageService { constructor( private readonly store: Store, @@ -11,7 +52,7 @@ export class MessageService { ) { } - getMessagesPage(sessionId: string, options: { limit: number; beforeSeq: number | null }): { + getMessagesPage(sessionId: string, options: { limit: number; beforeSeq: number | null }, agentState?: AgentState | null): { messages: DecryptedMessage[] page: { limit: number @@ -19,6 +60,7 @@ export class MessageService { nextBeforeSeq: number | null hasMore: boolean } + permissions: FilteredPermissions } { const stored = this.store.messages.getMessages(sessionId, options.limit, options.beforeSeq ?? undefined) const messages: DecryptedMessage[] = stored.map((message) => ({ @@ -41,6 +83,9 @@ export class MessageService { const hasMore = nextBeforeSeq !== null && this.store.messages.getMessages(sessionId, 1, nextBeforeSeq).length > 0 + // Filter permissions by time range + const permissions = filterPermissionsByTimeRange(agentState, messages) + return { messages, page: { @@ -48,7 +93,8 @@ export class MessageService { beforeSeq: options.beforeSeq, nextBeforeSeq, hasMore - } + }, + permissions } } diff --git a/hub/src/sync/syncEngine.ts b/hub/src/sync/syncEngine.ts index 1ab46e65f..3fdda791c 100644 --- a/hub/src/sync/syncEngine.ts +++ b/hub/src/sync/syncEngine.ts @@ -7,7 +7,7 @@ * - No E2E encryption; data is stored as JSON in SQLite */ -import type { DecryptedMessage, ModelMode, PermissionMode, Session, SyncEvent } from '@hapi/protocol/types' +import type { AgentState, DecryptedMessage, ModelMode, PermissionMode, Session, SyncEvent } from '@hapi/protocol/types' import type { Server } from 'socket.io' import type { Store } from '../store' import type { RpcRegistry } from '../socket/rpcRegistry' @@ -145,7 +145,7 @@ export class SyncEngine { return this.machineCache.getOnlineMachinesByNamespace(namespace) } - getMessagesPage(sessionId: string, options: { limit: number; beforeSeq: number | null }): { + getMessagesPage(sessionId: string, options: { limit: number; beforeSeq: number | null }, agentState?: AgentState | null): { messages: DecryptedMessage[] page: { limit: number @@ -153,8 +153,12 @@ export class SyncEngine { nextBeforeSeq: number | null hasMore: boolean } + permissions: { + requests: Record + completedRequests: Record + } } { - return this.messageService.getMessagesPage(sessionId, options) + return this.messageService.getMessagesPage(sessionId, options, agentState) } getMessagesAfter(sessionId: string, options: { afterSeq: number; limit: number }): DecryptedMessage[] { diff --git a/hub/src/web/routes/messages.ts b/hub/src/web/routes/messages.ts index 492298f28..8fda1cb6d 100644 --- a/hub/src/web/routes/messages.ts +++ b/hub/src/web/routes/messages.ts @@ -30,11 +30,12 @@ export function createMessagesRoutes(getSyncEngine: () => SyncEngine | null): Ho return sessionResult } const sessionId = sessionResult.sessionId + const agentState = sessionResult.session.agentState const parsed = querySchema.safeParse(c.req.query()) const limit = parsed.success ? (parsed.data.limit ?? 50) : 50 const beforeSeq = parsed.success ? (parsed.data.beforeSeq ?? null) : null - return c.json(engine.getMessagesPage(sessionId, { limit, beforeSeq })) + return c.json(engine.getMessagesPage(sessionId, { limit, beforeSeq }, agentState)) }) app.post('/sessions/:id/messages', async (c) => { diff --git a/web/src/chat/reducer.ts b/web/src/chat/reducer.ts index 798499c67..45046eca5 100644 --- a/web/src/chat/reducer.ts +++ b/web/src/chat/reducer.ts @@ -48,24 +48,12 @@ export function reduceChatBlocks( let hasReadyEvent = rootResult.hasReadyEvent // Only create permission-only tool cards when there is no tool call/result in the transcript. - // Also skip if the permission is older than the oldest message in the current view, - // to avoid mixing old tool cards with newer messages when paginating. - const oldestMessageTime = normalized.length > 0 - ? Math.min(...normalized.map(m => m.createdAt)) - : null - for (const [id, entry] of permissionsById) { if (toolIdsInMessages.has(id)) continue if (rootResult.toolBlocksById.has(id)) continue const createdAt = entry.permission.createdAt ?? Date.now() - // Skip permissions that are older than the oldest message in the current view. - // These will be shown when the user loads older messages. - if (oldestMessageTime !== null && createdAt < oldestMessageTime) { - continue - } - const block = ensureToolBlock(rootResult.blocks, rootResult.toolBlocksById, id, { createdAt, localId: null, diff --git a/web/src/components/SessionChat.tsx b/web/src/components/SessionChat.tsx index 3990cc295..0f7184ead 100644 --- a/web/src/components/SessionChat.tsx +++ b/web/src/components/SessionChat.tsx @@ -2,7 +2,7 @@ import { useCallback, useEffect, useMemo, useRef, useState } from 'react' import { useNavigate } from '@tanstack/react-router' import { AssistantRuntimeProvider } from '@assistant-ui/react' import type { ApiClient } from '@/api/client' -import type { AttachmentMetadata, DecryptedMessage, ModelMode, PermissionMode, Session } from '@/types/api' +import type { AttachmentMetadata, DecryptedMessage, FilteredPermissions, ModelMode, PermissionMode, Session } from '@/types/api' import type { ChatBlock, NormalizedMessage } from '@/chat/types' import type { Suggestion } from '@/hooks/useActiveSuggestions' import { normalizeDecryptedMessage } from '@/chat/normalize' @@ -22,6 +22,7 @@ export function SessionChat(props: { api: ApiClient session: Session messages: DecryptedMessage[] + permissions: FilteredPermissions messagesWarning: string | null hasMoreMessages: boolean isLoadingMessages: boolean @@ -179,9 +180,40 @@ export function SessionChat(props: { return normalized }, [props.messages]) + // Build filtered agentState using permissions from message window + // For pending requests, always use real-time data from agentState (SSE updates) + // For completed requests, merge filtered API data with real-time SSE updates + // This ensures old tool cards are filtered out while new completions appear immediately + const filteredAgentState = useMemo(() => { + if (!props.session.agentState) return null + + // Calculate oldest message time for filtering + const oldestMessageTime = normalizedMessages.length > 0 + ? Math.min(...normalizedMessages.map(m => m.createdAt)) + : null + + // Get live completed requests from SSE + const liveCompleted = props.session.agentState.completedRequests ?? {} + + // Filter live completed requests by message time range + const filteredLiveCompleted = oldestMessageTime === null + ? liveCompleted + : Object.fromEntries( + Object.entries(liveCompleted).filter(([, req]) => (req.createdAt ?? 0) >= oldestMessageTime) + ) + + return { + ...props.session.agentState, + requests: props.session.agentState.requests ?? {}, + // Merge API-filtered permissions with filtered live completions + // Live completions take precedence to show real-time updates + completedRequests: { ...props.permissions.completedRequests, ...filteredLiveCompleted } + } + }, [props.session.agentState, props.permissions, normalizedMessages]) + const reduced = useMemo( - () => reduceChatBlocks(normalizedMessages, props.session.agentState), - [normalizedMessages, props.session.agentState] + () => reduceChatBlocks(normalizedMessages, filteredAgentState), + [normalizedMessages, filteredAgentState] ) const reconciled = useMemo( () => reconcileChatBlocks(reduced.blocks, blocksByIdRef.current), diff --git a/web/src/hooks/queries/useMessages.ts b/web/src/hooks/queries/useMessages.ts index 237e16f87..84dfb2a2d 100644 --- a/web/src/hooks/queries/useMessages.ts +++ b/web/src/hooks/queries/useMessages.ts @@ -1,6 +1,6 @@ import { useCallback, useEffect, useSyncExternalStore } from 'react' import type { ApiClient } from '@/api/client' -import type { DecryptedMessage } from '@/types/api' +import type { DecryptedMessage, FilteredPermissions } from '@/types/api' import { clearMessageWindow, fetchLatestMessages, @@ -12,9 +12,12 @@ import { type MessageWindowState, } from '@/lib/message-window-store' +const emptyPermissions: FilteredPermissions = { requests: {}, completedRequests: {} } + const EMPTY_STATE: MessageWindowState = { sessionId: 'unknown', messages: [], + permissions: emptyPermissions, pending: [], pendingCount: 0, hasMore: false, @@ -29,6 +32,7 @@ const EMPTY_STATE: MessageWindowState = { export function useMessages(api: ApiClient | null, sessionId: string | null): { messages: DecryptedMessage[] + permissions: FilteredPermissions warning: string | null isLoading: boolean isLoadingMore: boolean @@ -98,6 +102,7 @@ export function useMessages(api: ApiClient | null, sessionId: string | null): { return { messages: state.messages, + permissions: state.permissions, warning: state.warning, isLoading: state.isLoading, isLoadingMore: state.isLoadingMore, diff --git a/web/src/lib/message-window-store.ts b/web/src/lib/message-window-store.ts index 76388b7de..82e027b7f 100644 --- a/web/src/lib/message-window-store.ts +++ b/web/src/lib/message-window-store.ts @@ -1,11 +1,14 @@ import type { ApiClient } from '@/api/client' -import type { DecryptedMessage, MessageStatus } from '@/types/api' +import type { DecryptedMessage, FilteredPermissions, MessageStatus } from '@/types/api' import { normalizeDecryptedMessage } from '@/chat/normalize' import { mergeMessages } from '@/lib/messages' +const emptyPermissions: FilteredPermissions = { requests: {}, completedRequests: {} } + export type MessageWindowState = { sessionId: string messages: DecryptedMessage[] + permissions: FilteredPermissions pending: DecryptedMessage[] pendingCount: number hasMore: boolean @@ -90,6 +93,7 @@ function createState(sessionId: string): InternalState { return { sessionId, messages: [], + permissions: emptyPermissions, pending: [], pendingCount: 0, pendingVisibleCount: 0, @@ -158,6 +162,7 @@ function buildState( prev: InternalState, updates: { messages?: DecryptedMessage[] + permissions?: FilteredPermissions pending?: DecryptedMessage[] pendingOverflowCount?: number pendingVisibleCount?: number @@ -170,6 +175,7 @@ function buildState( } ): InternalState { const messages = updates.messages ?? prev.messages + const permissions = updates.permissions ?? prev.permissions const pending = updates.pending ?? prev.pending const pendingOverflowCount = updates.pendingOverflowCount ?? prev.pendingOverflowCount const pendingOverflowVisibleCount = updates.pendingOverflowVisibleCount ?? prev.pendingOverflowVisibleCount @@ -188,6 +194,7 @@ function buildState( return { ...prev, messages, + permissions, pending, pendingOverflowCount, pendingVisibleCount, @@ -332,6 +339,7 @@ export async function fetchLatestMessages(api: ApiClient, sessionId: string): Pr const trimmed = trimVisible(merged, 'append') return buildState(prev, { messages: trimmed, + permissions: response.permissions, pending: [], pendingOverflowCount: 0, pendingVisibleCount: 0, @@ -343,6 +351,7 @@ export async function fetchLatestMessages(api: ApiClient, sessionId: string): Pr } const pendingResult = mergeIntoPending(prev, response.messages) return buildState(prev, { + permissions: response.permissions, pending: pendingResult.pending, pendingVisibleCount: pendingResult.pendingVisibleCount, pendingOverflowCount: pendingResult.pendingOverflowCount, @@ -372,8 +381,14 @@ export async function fetchOlderMessages(api: ApiClient, sessionId: string): Pro updateState(sessionId, (prev) => { const merged = mergeMessages(response.messages, prev.messages) const trimmed = trimVisible(merged, 'prepend') + // Merge permissions: combine old and new permissions + const mergedPermissions: FilteredPermissions = { + requests: { ...prev.permissions.requests, ...response.permissions.requests }, + completedRequests: { ...prev.permissions.completedRequests, ...response.permissions.completedRequests } + } return buildState(prev, { messages: trimmed, + permissions: mergedPermissions, hasMore: response.page.hasMore, isLoadingMore: false, }) diff --git a/web/src/lib/messages.ts b/web/src/lib/messages.ts index cd51888e3..ac3396c48 100644 --- a/web/src/lib/messages.ts +++ b/web/src/lib/messages.ts @@ -97,6 +97,7 @@ export function upsertMessagesInCache( incoming: DecryptedMessage[], ): InfiniteData { const mergedIncoming = mergeMessages([], incoming) + const emptyPermissions = { requests: {}, completedRequests: {} } if (!data || data.pages.length === 0) { return { @@ -109,6 +110,7 @@ export function upsertMessagesInCache( nextBeforeSeq: null, hasMore: false, }, + permissions: emptyPermissions, }, ], pageParams: [null], diff --git a/web/src/router.tsx b/web/src/router.tsx index 0dd3ad155..05f0f897f 100644 --- a/web/src/router.tsx +++ b/web/src/router.tsx @@ -191,6 +191,7 @@ function SessionPage() { } = useSession(api, sessionId) const { messages, + permissions: messagesPermissions, warning: messagesWarning, isLoading: messagesLoading, isLoadingMore: messagesLoadingMore, @@ -298,6 +299,7 @@ function SessionPage() { api={api} session={session} messages={messages} + permissions={messagesPermissions} messagesWarning={messagesWarning} hasMoreMessages={messagesHasMore} isLoadingMessages={messagesLoading} diff --git a/web/src/types/api.ts b/web/src/types/api.ts index 278a49bf2..5e0e0df48 100644 --- a/web/src/types/api.ts +++ b/web/src/types/api.ts @@ -1,4 +1,6 @@ import type { + AgentStateCompletedRequest, + AgentStateRequest, DecryptedMessage as ProtocolDecryptedMessage, Session, SessionSummary, @@ -8,6 +10,8 @@ import type { export type { AgentState, + AgentStateCompletedRequest, + AgentStateRequest, AttachmentMetadata, ModelMode, PermissionMode, @@ -61,6 +65,12 @@ export type AuthResponse = { export type SessionsResponse = { sessions: SessionSummary[] } export type SessionResponse = { session: Session } + +export type FilteredPermissions = { + requests: Record + completedRequests: Record +} + export type MessagesResponse = { messages: DecryptedMessage[] page: { @@ -69,6 +79,7 @@ export type MessagesResponse = { nextBeforeSeq: number | null hasMore: boolean } + permissions: FilteredPermissions } export type MachinesResponse = { machines: Machine[] }