Skip to content

Commit e52ad09

Browse files
feat(session): add bi-directional cursor-based pagination with Home/End navigation
Implements cursor-based pagination for message loading to handle long sessions without memory explosion with absolute navigation via Home/End keys. API changes: - Add 'before' cursor param: fetch messages older than cursor (newest first) - Add 'after' cursor param: fetch messages newer than cursor (oldest first) - Add 'oldest' param: start from oldest messages (for jumpToOldest) - Link headers with rel="prev"/"next" for cursor discovery (RFC 5005) TUI changes: - loadOlder/loadNewer actions with sliding window eviction (500 msg limit) - jumpToOldest (Home): fetches oldest page via ?oldest=true - jumpToLatest (End): fetches newest page, preserves revert marker - Detached mode: ignores SSE when viewing history to prevent gaps Implementation: - Binary.lowerBound for efficient cursor lookup - parseLinkHeader utility for RFC 5988 parsing - Message.stream() reverse option for ascending order - Smart parts cleanup: only deletes parts for evicted messages Tests: - Unit tests for pagination logic and cursor handling - API tests for before/after/oldest params and Link headers Resolves: anomalyco#6548
1 parent b3901ac commit e52ad09

File tree

13 files changed

+1230
-32
lines changed

13 files changed

+1230
-32
lines changed

packages/opencode/src/cli/cmd/tui/context/sync.tsx

Lines changed: 285 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ import { useArgs } from "./args"
2828
import { batch, onMount } from "solid-js"
2929
import { Log } from "@/util/log"
3030
import type { Path } from "@opencode-ai/sdk"
31+
import { parseLinkHeader } from "@/util/link-header"
32+
33+
/** Maximum messages kept in memory per session */
34+
const MAX_LOADED_MESSAGES = 500
35+
/** Chunk size for eviction when limit exceeded */
36+
const EVICTION_CHUNK_SIZE = 50
3137

3238
export const { use: useSync, provider: SyncProvider } = createSimpleContext({
3339
name: "Sync",
@@ -48,6 +54,15 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
4854
}
4955
config: Config
5056
session: Session[]
57+
message_page: {
58+
[sessionID: string]: {
59+
hasOlder: boolean
60+
hasNewer: boolean
61+
loading: boolean
62+
loadingDirection?: "older" | "newer"
63+
error?: string
64+
}
65+
}
5166
session_status: {
5267
[sessionID: string]: SessionStatus
5368
}
@@ -89,6 +104,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
89104
provider: [],
90105
provider_default: {},
91106
session: [],
107+
message_page: {},
92108
session_status: {},
93109
session_diff: {},
94110
todo: {},
@@ -226,19 +242,24 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
226242
}
227243

228244
case "message.updated": {
229-
const messages = store.message[event.properties.info.sessionID]
245+
const sessionID = event.properties.info.sessionID
246+
const page = store.message_page[sessionID]
247+
const messages = store.message[sessionID]
230248
if (!messages) {
231-
setStore("message", event.properties.info.sessionID, [event.properties.info])
249+
setStore("message", sessionID, [event.properties.info])
232250
break
233251
}
234252
const result = Binary.search(messages, event.properties.info.id, (m) => m.id)
235253
if (result.found) {
236-
setStore("message", event.properties.info.sessionID, result.index, reconcile(event.properties.info))
254+
setStore("message", sessionID, result.index, reconcile(event.properties.info))
255+
break
256+
}
257+
if (page?.hasNewer) {
237258
break
238259
}
239260
setStore(
240261
"message",
241-
event.properties.info.sessionID,
262+
sessionID,
242263
produce((draft) => {
243264
draft.splice(result.index, 0, event.properties.info)
244265
}),
@@ -279,6 +300,13 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
279300
break
280301
}
281302
case "message.part.updated": {
303+
const sessionID = event.properties.part.sessionID
304+
const page = store.message_page[sessionID]
305+
const messages = store.message[sessionID]
306+
const messageExists = messages?.some((m) => m.id === event.properties.part.messageID)
307+
if (page?.hasNewer && !messageExists) {
308+
break
309+
}
282310
const parts = store.part[event.properties.part.messageID]
283311
if (!parts) {
284312
setStore("part", event.properties.part.messageID, [event.properties.part])
@@ -389,6 +417,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
389417
})
390418

391419
const fullSyncedSessions = new Set<string>()
420+
const loadingGuard = new Set<string>()
392421
const result = {
393422
data: store,
394423
set: setStore,
@@ -422,6 +451,8 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
422451
sdk.client.session.todo({ sessionID }),
423452
sdk.client.session.diff({ sessionID }),
424453
])
454+
const link = messages.response.headers.get("link") ?? ""
455+
const hasOlder = parseLinkHeader(link).prev !== undefined
425456
setStore(
426457
produce((draft) => {
427458
const match = Binary.search(draft.session, sessionID, (s) => s.id)
@@ -433,10 +464,260 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
433464
draft.part[message.info.id] = message.parts
434465
}
435466
draft.session_diff[sessionID] = diff.data ?? []
467+
draft.message_page[sessionID] = { hasOlder, hasNewer: false, loading: false, error: undefined }
436468
}),
437469
)
438470
fullSyncedSessions.add(sessionID)
439471
},
472+
async loadOlder(sessionID: string) {
473+
const page = store.message_page[sessionID]
474+
if (page?.loading || !page?.hasOlder) return
475+
const messages = store.message[sessionID] ?? []
476+
const oldest = messages.at(0)
477+
if (!oldest) return
478+
if (loadingGuard.has(sessionID)) return
479+
loadingGuard.add(sessionID)
480+
try {
481+
setStore("message_page", sessionID, { ...page, loading: true, loadingDirection: "older", error: undefined })
482+
483+
const res = await sdk.client.session.messages(
484+
{ sessionID, before: oldest.id, limit: 100 },
485+
{ throwOnError: true },
486+
)
487+
const link = res.response.headers.get("link") ?? ""
488+
const hasOlder = parseLinkHeader(link).prev !== undefined
489+
setStore(
490+
produce((draft) => {
491+
const existing = draft.message[sessionID] ?? []
492+
for (const msg of res.data ?? []) {
493+
const match = Binary.search(existing, msg.info.id, (m) => m.id)
494+
if (!match.found) {
495+
existing.splice(match.index, 0, msg.info)
496+
draft.part[msg.info.id] = msg.parts
497+
}
498+
}
499+
if (existing.length > MAX_LOADED_MESSAGES + EVICTION_CHUNK_SIZE) {
500+
const evicted = existing.splice(-(existing.length - MAX_LOADED_MESSAGES))
501+
for (const msg of evicted) delete draft.part[msg.id]
502+
draft.message_page[sessionID] = { hasOlder, hasNewer: true, loading: false, error: undefined }
503+
} else {
504+
draft.message_page[sessionID] = {
505+
hasOlder,
506+
hasNewer: draft.message_page[sessionID]?.hasNewer ?? false,
507+
loading: false,
508+
error: undefined,
509+
}
510+
}
511+
}),
512+
)
513+
} catch (e) {
514+
const page = store.message_page[sessionID]
515+
setStore("message_page", sessionID, {
516+
hasOlder: page?.hasOlder ?? false,
517+
hasNewer: page?.hasNewer ?? false,
518+
loading: false,
519+
error: e instanceof Error ? e.message : String(e),
520+
})
521+
} finally {
522+
loadingGuard.delete(sessionID)
523+
}
524+
},
525+
async loadNewer(sessionID: string) {
526+
const page = store.message_page[sessionID]
527+
if (page?.loading || !page?.hasNewer) return
528+
const messages = store.message[sessionID] ?? []
529+
const newest = messages.at(-1)
530+
if (!newest) return
531+
if (loadingGuard.has(sessionID)) return
532+
loadingGuard.add(sessionID)
533+
try {
534+
setStore("message_page", sessionID, { ...page, loading: true, loadingDirection: "newer", error: undefined })
535+
const res = await sdk.client.session.messages(
536+
{ sessionID, after: newest.id, limit: 100 },
537+
{ throwOnError: true },
538+
)
539+
const link = res.response.headers.get("link") ?? ""
540+
const hasNewer = parseLinkHeader(link).next !== undefined
541+
setStore(
542+
produce((draft) => {
543+
const existing = draft.message[sessionID] ?? []
544+
for (const msg of res.data ?? []) {
545+
const match = Binary.search(existing, msg.info.id, (m) => m.id)
546+
if (!match.found) {
547+
existing.splice(match.index, 0, msg.info)
548+
draft.part[msg.info.id] = msg.parts
549+
}
550+
}
551+
if (existing.length > MAX_LOADED_MESSAGES + EVICTION_CHUNK_SIZE) {
552+
const evicted = existing.splice(0, existing.length - MAX_LOADED_MESSAGES)
553+
for (const msg of evicted) delete draft.part[msg.id]
554+
draft.message_page[sessionID] = { hasOlder: true, hasNewer, loading: false, error: undefined }
555+
} else {
556+
draft.message_page[sessionID] = {
557+
hasOlder: draft.message_page[sessionID]?.hasOlder ?? false,
558+
hasNewer,
559+
loading: false,
560+
error: undefined,
561+
}
562+
}
563+
}),
564+
)
565+
} catch (e) {
566+
const page = store.message_page[sessionID]
567+
setStore("message_page", sessionID, {
568+
hasOlder: page?.hasOlder ?? false,
569+
hasNewer: page?.hasNewer ?? false,
570+
loading: false,
571+
error: e instanceof Error ? e.message : String(e),
572+
})
573+
} finally {
574+
loadingGuard.delete(sessionID)
575+
}
576+
},
577+
async jumpToLatest(sessionID: string) {
578+
const page = store.message_page[sessionID]
579+
if (page?.loading || !page?.hasNewer) return
580+
if (loadingGuard.has(sessionID)) return
581+
loadingGuard.add(sessionID)
582+
583+
try {
584+
// Check for revert state
585+
const session = store.session.find((s) => s.id === sessionID)
586+
const revertMessageID = session?.revert?.messageID
587+
588+
setStore("message_page", sessionID, {
589+
...page,
590+
loading: true,
591+
loadingDirection: "newer",
592+
error: undefined,
593+
})
594+
595+
// Fetch newest page (no cursor = newest)
596+
const res = await sdk.client.session.messages({ sessionID, limit: 100 }, { throwOnError: true })
597+
598+
let messages = res.data ?? []
599+
const link = res.response.headers.get("link") ?? ""
600+
const hasOlder = parseLinkHeader(link).prev !== undefined
601+
602+
// Revert-aware: If in revert state and marker not in results, fetch it
603+
if (revertMessageID && !messages.some((m) => m.info.id === revertMessageID)) {
604+
try {
605+
const revertResult = await sdk.client.session.message(
606+
{ sessionID, messageID: revertMessageID },
607+
{ throwOnError: true },
608+
)
609+
if (revertResult.data) {
610+
// Prepend revert message (it's older than newest page)
611+
messages = [revertResult.data, ...messages]
612+
}
613+
} catch (e) {
614+
// Revert message may have been deleted, continue without it
615+
Log.Default.info("Revert marker fetch failed (may be deleted)", {
616+
messageID: revertMessageID,
617+
error: e,
618+
})
619+
}
620+
}
621+
622+
setStore(
623+
produce((draft) => {
624+
// Clean up parts only for messages not in new results
625+
const oldMessages = draft.message[sessionID] ?? []
626+
const newIds = new Set(messages.map((m) => m.info.id))
627+
for (const msg of oldMessages) {
628+
if (!newIds.has(msg.id)) {
629+
delete draft.part[msg.id]
630+
}
631+
}
632+
633+
// Store new messages
634+
draft.message[sessionID] = messages.map((m) => m.info)
635+
for (const msg of messages) {
636+
draft.part[msg.info.id] = msg.parts
637+
}
638+
draft.message_page[sessionID] = {
639+
hasOlder,
640+
hasNewer: false,
641+
loading: false,
642+
error: undefined,
643+
}
644+
}),
645+
)
646+
} catch (e) {
647+
setStore(
648+
produce((draft) => {
649+
const p = draft.message_page[sessionID]
650+
if (p) {
651+
p.loading = false
652+
p.error = e instanceof Error ? e.message : String(e)
653+
}
654+
}),
655+
)
656+
} finally {
657+
loadingGuard.delete(sessionID)
658+
}
659+
},
660+
async jumpToOldest(sessionID: string) {
661+
const page = store.message_page[sessionID]
662+
if (page?.loading || !page?.hasOlder) return
663+
if (loadingGuard.has(sessionID)) return
664+
loadingGuard.add(sessionID)
665+
666+
try {
667+
setStore("message_page", sessionID, {
668+
...page,
669+
loading: true,
670+
loadingDirection: "older",
671+
error: undefined,
672+
})
673+
674+
const res = await sdk.client.session.messages(
675+
{ sessionID, oldest: true, limit: 100 },
676+
{ throwOnError: true },
677+
)
678+
679+
const messages = res.data ?? []
680+
const link = res.response.headers.get("link") ?? ""
681+
const hasNewer = parseLinkHeader(link).next !== undefined
682+
683+
setStore(
684+
produce((draft) => {
685+
// Clean up parts only for messages not in new results
686+
const oldMessages = draft.message[sessionID] ?? []
687+
const newIds = new Set(messages.map((m) => m.info.id))
688+
for (const msg of oldMessages) {
689+
if (!newIds.has(msg.id)) {
690+
delete draft.part[msg.id]
691+
}
692+
}
693+
694+
// Store new messages
695+
draft.message[sessionID] = messages.map((m) => m.info)
696+
for (const msg of messages) {
697+
draft.part[msg.info.id] = msg.parts
698+
}
699+
draft.message_page[sessionID] = {
700+
hasOlder: false,
701+
hasNewer,
702+
loading: false,
703+
error: undefined,
704+
}
705+
}),
706+
)
707+
} catch (e) {
708+
setStore(
709+
produce((draft) => {
710+
const p = draft.message_page[sessionID]
711+
if (p) {
712+
p.loading = false
713+
p.error = e instanceof Error ? e.message : String(e)
714+
}
715+
}),
716+
)
717+
} finally {
718+
loadingGuard.delete(sessionID)
719+
}
720+
},
440721
},
441722
bootstrap,
442723
}

0 commit comments

Comments
 (0)