Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions packages/opencode/src/cli/cmd/tui/component/prompt/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ export function Prompt(props: PromptProps) {
const renderer = useRenderer()
const { theme, syntax } = useTheme()

const queuedMessages = createMemo(() => {
if (!props.sessionID) return []
const messages = sync.data.message[props.sessionID] ?? []
if (status().type !== "busy") return []
const sorted = [...messages].sort((a, b) => a.id.localeCompare(b.id))
const pending = sorted.findLast((m) => m.role === "assistant" && m.time.completed === undefined)
if (!pending) return []
return sorted.filter((m) => m.role === "user" && m.id > pending.id)
})

function promptModelWarning() {
toast.show({
variant: "warning",
Expand Down Expand Up @@ -749,6 +759,26 @@ export function Prompt(props: PromptProps) {
}
if (store.mode === "normal") autocomplete.onKeyDown(e)
if (!autocomplete.visible) {
if (keybind.match("history_previous", e) && input.plainText === "" && queuedMessages().length > 0) {
const queued = queuedMessages()
const last = queued.at(-1)
if (last && props.sessionID) {
const { data: msg } = await sdk.client.session.cancelQueue({
sessionID: props.sessionID,
messageID: last.id,
})
if (msg) {
const text = msg.parts
.filter((p) => p.type === "text")
.map((p) => p.text)
.join("")
input.setText(text)
input.cursorOffset = text.length
}
}
e.preventDefault()
return
}
if (
(keybind.match("history_previous", e) && input.cursorOffset === 0) ||
(keybind.match("history_next", e) && input.cursorOffset === input.plainText.length)
Expand Down
60 changes: 60 additions & 0 deletions packages/opencode/src/server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,66 @@ export namespace Server {
return c.json(true)
},
)
.get(
"/session/:sessionID/queue",
describeRoute({
summary: "Get queued messages",
description: "Get list of message IDs that are queued and waiting to be processed.",
operationId: "session.queue",
responses: {
200: {
description: "List of queued message IDs",
content: {
"application/json": {
schema: resolver(z.array(z.string())),
},
},
},
...errors(400, 404),
},
}),
validator(
"param",
z.object({
sessionID: z.string(),
}),
),
async (c) => {
const queued = SessionPrompt.queued(c.req.valid("param").sessionID)
return c.json(queued)
},
)
.delete(
"/session/:sessionID/queue/:messageID",
describeRoute({
summary: "Cancel queued message",
description: "Cancel a queued message that has not yet been processed by the agent.",
operationId: "session.cancelQueue",
responses: {
200: {
description: "Cancelled message with parts",
content: {
"application/json": {
schema: resolver(MessageV2.WithParts.nullable()),
},
},
},
...errors(400, 404),
},
}),
validator(
"param",
z.object({
sessionID: z.string(),
messageID: z.string(),
}),
),
async (c) => {
const { sessionID, messageID } = c.req.valid("param")
const message = await SessionPrompt.cancelQueued(sessionID, messageID)
return c.json(message ?? null)
},
)
.post(
"/session/:sessionID/share",
describeRoute({
Expand Down
36 changes: 32 additions & 4 deletions packages/opencode/src/session/prompt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ export namespace SessionPrompt {
{
abort: AbortController
callbacks: {
messageID: string
resolve(input: MessageV2.WithParts): void
reject(): void
}[]
Expand Down Expand Up @@ -206,7 +207,7 @@ export namespace SessionPrompt {
return message
}

return loop(input.sessionID)
return loop(input.sessionID, message.info.id)
})

function start(sessionID: string) {
Expand Down Expand Up @@ -234,12 +235,39 @@ export namespace SessionPrompt {
return
}

export const loop = fn(Identifier.schema("session"), async (sessionID) => {
export function queued(sessionID: string) {
const s = state()
const match = s[sessionID]
if (!match) return []
return match.callbacks.map((c) => c.messageID).filter(Boolean)
}

export async function cancelQueued(sessionID: string, messageID: string) {
log.info("cancelQueued", { sessionID, messageID })
const s = state()
const match = s[sessionID]
if (!match) return

const index = match.callbacks.findIndex((c) => c.messageID === messageID)
if (index === -1) return

const [removed] = match.callbacks.splice(index, 1)
removed.reject()

const messages = await Session.messages({ sessionID })
const message = messages.find((m) => m.info.id === messageID)
if (!message) return

await Session.removeMessage({ sessionID, messageID })
return message
}

export async function loop(sessionID: string, messageID?: string) {
const abort = start(sessionID)
if (!abort) {
return new Promise<MessageV2.WithParts>((resolve, reject) => {
const callbacks = state()[sessionID].callbacks
callbacks.push({ resolve, reject })
callbacks.push({ messageID: messageID!, resolve, reject })
})
}

Expand Down Expand Up @@ -663,7 +691,7 @@ export namespace SessionPrompt {
return item
}
throw new Error("Impossible")
})
}

async function lastModel(sessionID: string) {
for await (const item of MessageV2.stream(sessionID)) {
Expand Down
68 changes: 68 additions & 0 deletions packages/sdk/js/src/v2/gen/sdk.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ import type {
PtyUpdateResponses,
SessionAbortErrors,
SessionAbortResponses,
SessionCancelQueueErrors,
SessionCancelQueueResponses,
SessionChildrenErrors,
SessionChildrenResponses,
SessionCommandErrors,
Expand All @@ -97,6 +99,8 @@ import type {
SessionPromptAsyncResponses,
SessionPromptErrors,
SessionPromptResponses,
SessionQueueErrors,
SessionQueueResponses,
SessionRevertErrors,
SessionRevertResponses,
SessionShareErrors,
Expand Down Expand Up @@ -1024,6 +1028,70 @@ export class Session extends HeyApiClient {
})
}

/**
* Get queued messages
*
* Get list of message IDs that are queued and waiting to be processed.
*/
public queue<ThrowOnError extends boolean = false>(
parameters: {
sessionID: string
directory?: string
},
options?: Options<never, ThrowOnError>,
) {
const params = buildClientParams(
[parameters],
[
{
args: [
{ in: "path", key: "sessionID" },
{ in: "query", key: "directory" },
],
},
],
)
return (options?.client ?? this.client).get<SessionQueueResponses, SessionQueueErrors, ThrowOnError>({
url: "/session/{sessionID}/queue",
...options,
...params,
})
}

/**
* Cancel queued message
*
* Cancel a queued message that has not yet been processed by the agent.
*/
public cancelQueue<ThrowOnError extends boolean = false>(
parameters: {
sessionID: string
messageID: string
directory?: string
},
options?: Options<never, ThrowOnError>,
) {
const params = buildClientParams(
[parameters],
[
{
args: [
{ in: "path", key: "sessionID" },
{ in: "path", key: "messageID" },
{ in: "query", key: "directory" },
],
},
],
)
return (options?.client ?? this.client).delete<SessionCancelQueueResponses, SessionCancelQueueErrors, ThrowOnError>(
{
url: "/session/{sessionID}/queue/{messageID}",
...options,
...params,
},
)
}

/**
* Unshare session
*
Expand Down
70 changes: 70 additions & 0 deletions packages/sdk/js/src/v2/gen/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2606,6 +2606,76 @@ export type SessionAbortResponses = {

export type SessionAbortResponse = SessionAbortResponses[keyof SessionAbortResponses]

export type SessionQueueData = {
body?: never
path: {
sessionID: string
}
query?: {
directory?: string
}
url: "/session/{sessionID}/queue"
}

export type SessionQueueErrors = {
/**
* Bad request
*/
400: BadRequestError
/**
* Not found
*/
404: NotFoundError
}

export type SessionQueueError = SessionQueueErrors[keyof SessionQueueErrors]

export type SessionQueueResponses = {
/**
* List of queued message IDs
*/
200: Array<string>
}

export type SessionQueueResponse = SessionQueueResponses[keyof SessionQueueResponses]

export type SessionCancelQueueData = {
body?: never
path: {
sessionID: string
messageID: string
}
query?: {
directory?: string
}
url: "/session/{sessionID}/queue/{messageID}"
}

export type SessionCancelQueueErrors = {
/**
* Bad request
*/
400: BadRequestError
/**
* Not found
*/
404: NotFoundError
}

export type SessionCancelQueueError = SessionCancelQueueErrors[keyof SessionCancelQueueErrors]

export type SessionCancelQueueResponses = {
/**
* Cancelled message with parts
*/
200: {
info: Message
parts: Array<Part>
} | null
}

export type SessionCancelQueueResponse = SessionCancelQueueResponses[keyof SessionCancelQueueResponses]

export type SessionUnshareData = {
body?: never
path: {
Expand Down
Loading