Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 49 additions & 3 deletions hub/src/sync/messageService.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,49 @@
import type { AttachmentMetadata, DecryptedMessage } from '@hapi/protocol/types'
import type { AgentState, AgentStateCompletedRequest, AgentStateRequest, AttachmentMetadata, DecryptedMessage } from '@hapi/protocol/types'
import type { Server } from 'socket.io'
import type { Store } from '../store'
import { EventPublisher } from './eventPublisher'

export type FilteredPermissions = {
requests: Record<string, AgentStateRequest>
completedRequests: Record<string, AgentStateCompletedRequest>
}

/**
* Filter permissions by time range based on messages.
* Only returns permissions that were created within or after the oldest message's time.
*/
export function filterPermissionsByTimeRange(
agentState: AgentState | null | undefined,
messages: DecryptedMessage[]
): FilteredPermissions {
if (!agentState || messages.length === 0) {
return { requests: {}, completedRequests: {} }
}

const oldestTime = Math.min(...messages.map(m => m.createdAt))

const requests: Record<string, AgentStateRequest> = {}
const completedRequests: Record<string, AgentStateCompletedRequest> = {}

// Filter pending requests
for (const [id, req] of Object.entries(agentState.requests ?? {})) {
const createdAt = req.createdAt ?? Date.now()
if (createdAt >= oldestTime) {
requests[id] = req
}
}

// Filter completed requests
for (const [id, req] of Object.entries(agentState.completedRequests ?? {})) {
const createdAt = req.createdAt ?? 0
if (createdAt >= oldestTime) {
completedRequests[id] = req
}
}

return { requests, completedRequests }
}

export class MessageService {
constructor(
private readonly store: Store,
Expand All @@ -11,14 +52,15 @@ export class MessageService {
) {
}

getMessagesPage(sessionId: string, options: { limit: number; beforeSeq: number | null }): {
getMessagesPage(sessionId: string, options: { limit: number; beforeSeq: number | null }, agentState?: AgentState | null): {
messages: DecryptedMessage[]
page: {
limit: number
beforeSeq: number | null
nextBeforeSeq: number | null
hasMore: boolean
}
permissions: FilteredPermissions
} {
const stored = this.store.messages.getMessages(sessionId, options.limit, options.beforeSeq ?? undefined)
const messages: DecryptedMessage[] = stored.map((message) => ({
Expand All @@ -41,14 +83,18 @@ export class MessageService {
const hasMore = nextBeforeSeq !== null
&& this.store.messages.getMessages(sessionId, 1, nextBeforeSeq).length > 0

// Filter permissions by time range
const permissions = filterPermissionsByTimeRange(agentState, messages)

return {
messages,
page: {
limit: options.limit,
beforeSeq: options.beforeSeq,
nextBeforeSeq,
hasMore
}
},
permissions
}
}

Expand Down
10 changes: 7 additions & 3 deletions hub/src/sync/syncEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* - No E2E encryption; data is stored as JSON in SQLite
*/

import type { DecryptedMessage, ModelMode, PermissionMode, Session, SyncEvent } from '@hapi/protocol/types'
import type { AgentState, DecryptedMessage, ModelMode, PermissionMode, Session, SyncEvent } from '@hapi/protocol/types'
import type { Server } from 'socket.io'
import type { Store } from '../store'
import type { RpcRegistry } from '../socket/rpcRegistry'
Expand Down Expand Up @@ -145,16 +145,20 @@ export class SyncEngine {
return this.machineCache.getOnlineMachinesByNamespace(namespace)
}

getMessagesPage(sessionId: string, options: { limit: number; beforeSeq: number | null }): {
getMessagesPage(sessionId: string, options: { limit: number; beforeSeq: number | null }, agentState?: AgentState | null): {
messages: DecryptedMessage[]
page: {
limit: number
beforeSeq: number | null
nextBeforeSeq: number | null
hasMore: boolean
}
permissions: {
requests: Record<string, unknown>
completedRequests: Record<string, unknown>
}
} {
return this.messageService.getMessagesPage(sessionId, options)
return this.messageService.getMessagesPage(sessionId, options, agentState)
}

getMessagesAfter(sessionId: string, options: { afterSeq: number; limit: number }): DecryptedMessage[] {
Expand Down
3 changes: 2 additions & 1 deletion hub/src/web/routes/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ export function createMessagesRoutes(getSyncEngine: () => SyncEngine | null): Ho
return sessionResult
}
const sessionId = sessionResult.sessionId
const agentState = sessionResult.session.agentState

const parsed = querySchema.safeParse(c.req.query())
const limit = parsed.success ? (parsed.data.limit ?? 50) : 50
const beforeSeq = parsed.success ? (parsed.data.beforeSeq ?? null) : null
return c.json(engine.getMessagesPage(sessionId, { limit, beforeSeq }))
return c.json(engine.getMessagesPage(sessionId, { limit, beforeSeq }, agentState))
})

app.post('/sessions/:id/messages', async (c) => {
Expand Down
12 changes: 0 additions & 12 deletions web/src/chat/reducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,12 @@ export function reduceChatBlocks(
let hasReadyEvent = rootResult.hasReadyEvent

// Only create permission-only tool cards when there is no tool call/result in the transcript.
// Also skip if the permission is older than the oldest message in the current view,
// to avoid mixing old tool cards with newer messages when paginating.
const oldestMessageTime = normalized.length > 0
? Math.min(...normalized.map(m => m.createdAt))
: null

for (const [id, entry] of permissionsById) {
if (toolIdsInMessages.has(id)) continue
if (rootResult.toolBlocksById.has(id)) continue

const createdAt = entry.permission.createdAt ?? Date.now()

// Skip permissions that are older than the oldest message in the current view.
// These will be shown when the user loads older messages.
if (oldestMessageTime !== null && createdAt < oldestMessageTime) {
continue
}

const block = ensureToolBlock(rootResult.blocks, rootResult.toolBlocksById, id, {
createdAt,
localId: null,
Expand Down
38 changes: 35 additions & 3 deletions web/src/components/SessionChat.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { useCallback, useEffect, useMemo, useRef, useState } from 'react'
import { useNavigate } from '@tanstack/react-router'
import { AssistantRuntimeProvider } from '@assistant-ui/react'
import type { ApiClient } from '@/api/client'
import type { AttachmentMetadata, DecryptedMessage, ModelMode, PermissionMode, Session } from '@/types/api'
import type { AttachmentMetadata, DecryptedMessage, FilteredPermissions, ModelMode, PermissionMode, Session } from '@/types/api'
import type { ChatBlock, NormalizedMessage } from '@/chat/types'
import type { Suggestion } from '@/hooks/useActiveSuggestions'
import { normalizeDecryptedMessage } from '@/chat/normalize'
Expand All @@ -22,6 +22,7 @@ export function SessionChat(props: {
api: ApiClient
session: Session
messages: DecryptedMessage[]
permissions: FilteredPermissions
messagesWarning: string | null
hasMoreMessages: boolean
isLoadingMessages: boolean
Expand Down Expand Up @@ -179,9 +180,40 @@ export function SessionChat(props: {
return normalized
}, [props.messages])

// Build filtered agentState using permissions from message window
// For pending requests, always use real-time data from agentState (SSE updates)
// For completed requests, merge filtered API data with real-time SSE updates
// This ensures old tool cards are filtered out while new completions appear immediately
const filteredAgentState = useMemo(() => {
if (!props.session.agentState) return null

// Calculate oldest message time for filtering
const oldestMessageTime = normalizedMessages.length > 0
? Math.min(...normalizedMessages.map(m => m.createdAt))
: null

// Get live completed requests from SSE
const liveCompleted = props.session.agentState.completedRequests ?? {}

// Filter live completed requests by message time range
const filteredLiveCompleted = oldestMessageTime === null
? liveCompleted
: Object.fromEntries(
Object.entries(liveCompleted).filter(([, req]) => (req.createdAt ?? 0) >= oldestMessageTime)
)

return {
...props.session.agentState,
requests: props.session.agentState.requests ?? {},
// Merge API-filtered permissions with filtered live completions
// Live completions take precedence to show real-time updates
completedRequests: { ...props.permissions.completedRequests, ...filteredLiveCompleted }
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MAJOR] [Realtime] 完成态权限不会随着 SSE 更新

Why this is a problem: 现在 SessionChat 只使用 props.permissions.completedRequests,而该对象只在 fetchLatestMessages/fetchOlderMessages 时更新,message-received 的 SSE 路径不会刷新权限。这样新完成的权限(没有对应 tool message 的情况下)会一直不显示,直到用户手动刷新消息。

Suggested fix:

const oldestMessageTime = normalizedMessages.length > 0
    ? Math.min(...normalizedMessages.map(m => m.createdAt))
    : null
const liveCompleted = props.session.agentState?.completedRequests ?? {}
const filteredLiveCompleted = oldestMessageTime === null
    ? liveCompleted
    : Object.fromEntries(
        Object.entries(liveCompleted).filter(([, req]) => (req.createdAt ?? 0) >= oldestMessageTime)
    )
const filteredAgentState = {
    ...props.session.agentState,
    requests: props.session.agentState?.requests ?? {},
    completedRequests: { ...props.permissions.completedRequests, ...filteredLiveCompleted }
}

}, [props.session.agentState, props.permissions, normalizedMessages])

const reduced = useMemo(
() => reduceChatBlocks(normalizedMessages, props.session.agentState),
[normalizedMessages, props.session.agentState]
() => reduceChatBlocks(normalizedMessages, filteredAgentState),
[normalizedMessages, filteredAgentState]
)
const reconciled = useMemo(
() => reconcileChatBlocks(reduced.blocks, blocksByIdRef.current),
Expand Down
7 changes: 6 additions & 1 deletion web/src/hooks/queries/useMessages.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { useCallback, useEffect, useSyncExternalStore } from 'react'
import type { ApiClient } from '@/api/client'
import type { DecryptedMessage } from '@/types/api'
import type { DecryptedMessage, FilteredPermissions } from '@/types/api'
import {
clearMessageWindow,
fetchLatestMessages,
Expand All @@ -12,9 +12,12 @@ import {
type MessageWindowState,
} from '@/lib/message-window-store'

const emptyPermissions: FilteredPermissions = { requests: {}, completedRequests: {} }

const EMPTY_STATE: MessageWindowState = {
sessionId: 'unknown',
messages: [],
permissions: emptyPermissions,
pending: [],
pendingCount: 0,
hasMore: false,
Expand All @@ -29,6 +32,7 @@ const EMPTY_STATE: MessageWindowState = {

export function useMessages(api: ApiClient | null, sessionId: string | null): {
messages: DecryptedMessage[]
permissions: FilteredPermissions
warning: string | null
isLoading: boolean
isLoadingMore: boolean
Expand Down Expand Up @@ -98,6 +102,7 @@ export function useMessages(api: ApiClient | null, sessionId: string | null): {

return {
messages: state.messages,
permissions: state.permissions,
warning: state.warning,
isLoading: state.isLoading,
isLoadingMore: state.isLoadingMore,
Expand Down
17 changes: 16 additions & 1 deletion web/src/lib/message-window-store.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import type { ApiClient } from '@/api/client'
import type { DecryptedMessage, MessageStatus } from '@/types/api'
import type { DecryptedMessage, FilteredPermissions, MessageStatus } from '@/types/api'
import { normalizeDecryptedMessage } from '@/chat/normalize'
import { mergeMessages } from '@/lib/messages'

const emptyPermissions: FilteredPermissions = { requests: {}, completedRequests: {} }

export type MessageWindowState = {
sessionId: string
messages: DecryptedMessage[]
permissions: FilteredPermissions
pending: DecryptedMessage[]
pendingCount: number
hasMore: boolean
Expand Down Expand Up @@ -90,6 +93,7 @@ function createState(sessionId: string): InternalState {
return {
sessionId,
messages: [],
permissions: emptyPermissions,
pending: [],
pendingCount: 0,
pendingVisibleCount: 0,
Expand Down Expand Up @@ -158,6 +162,7 @@ function buildState(
prev: InternalState,
updates: {
messages?: DecryptedMessage[]
permissions?: FilteredPermissions
pending?: DecryptedMessage[]
pendingOverflowCount?: number
pendingVisibleCount?: number
Expand All @@ -170,6 +175,7 @@ function buildState(
}
): InternalState {
const messages = updates.messages ?? prev.messages
const permissions = updates.permissions ?? prev.permissions
const pending = updates.pending ?? prev.pending
const pendingOverflowCount = updates.pendingOverflowCount ?? prev.pendingOverflowCount
const pendingOverflowVisibleCount = updates.pendingOverflowVisibleCount ?? prev.pendingOverflowVisibleCount
Expand All @@ -188,6 +194,7 @@ function buildState(
return {
...prev,
messages,
permissions,
pending,
pendingOverflowCount,
pendingVisibleCount,
Expand Down Expand Up @@ -332,6 +339,7 @@ export async function fetchLatestMessages(api: ApiClient, sessionId: string): Pr
const trimmed = trimVisible(merged, 'append')
return buildState(prev, {
messages: trimmed,
permissions: response.permissions,
pending: [],
pendingOverflowCount: 0,
pendingVisibleCount: 0,
Expand All @@ -343,6 +351,7 @@ export async function fetchLatestMessages(api: ApiClient, sessionId: string): Pr
}
const pendingResult = mergeIntoPending(prev, response.messages)
return buildState(prev, {
permissions: response.permissions,
pending: pendingResult.pending,
pendingVisibleCount: pendingResult.pendingVisibleCount,
pendingOverflowCount: pendingResult.pendingOverflowCount,
Expand Down Expand Up @@ -372,8 +381,14 @@ export async function fetchOlderMessages(api: ApiClient, sessionId: string): Pro
updateState(sessionId, (prev) => {
const merged = mergeMessages(response.messages, prev.messages)
const trimmed = trimVisible(merged, 'prepend')
// Merge permissions: combine old and new permissions
const mergedPermissions: FilteredPermissions = {
requests: { ...prev.permissions.requests, ...response.permissions.requests },
completedRequests: { ...prev.permissions.completedRequests, ...response.permissions.completedRequests }
}
return buildState(prev, {
messages: trimmed,
permissions: mergedPermissions,
hasMore: response.page.hasMore,
isLoadingMore: false,
})
Expand Down
2 changes: 2 additions & 0 deletions web/src/lib/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ export function upsertMessagesInCache(
incoming: DecryptedMessage[],
): InfiniteData<MessagesResponse> {
const mergedIncoming = mergeMessages([], incoming)
const emptyPermissions = { requests: {}, completedRequests: {} }

if (!data || data.pages.length === 0) {
return {
Expand All @@ -109,6 +110,7 @@ export function upsertMessagesInCache(
nextBeforeSeq: null,
hasMore: false,
},
permissions: emptyPermissions,
},
],
pageParams: [null],
Expand Down
2 changes: 2 additions & 0 deletions web/src/router.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ function SessionPage() {
} = useSession(api, sessionId)
const {
messages,
permissions: messagesPermissions,
warning: messagesWarning,
isLoading: messagesLoading,
isLoadingMore: messagesLoadingMore,
Expand Down Expand Up @@ -298,6 +299,7 @@ function SessionPage() {
api={api}
session={session}
messages={messages}
permissions={messagesPermissions}
messagesWarning={messagesWarning}
hasMoreMessages={messagesHasMore}
isLoadingMessages={messagesLoading}
Expand Down
Loading