diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index 8dc2276b751..46ad668c567 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -103,6 +103,11 @@ use toml::Value as TomlValue; const EXTERNAL_EDITOR_HINT: &str = "Save and close external editor to continue."; const THREAD_EVENT_CHANNEL_CAPACITY: usize = 32768; +/// Baseline cadence for periodic stream commit animation ticks. +/// +/// Smooth-mode streaming drains one line per tick, so this interval controls +/// perceived typing speed for non-backlogged output. +const COMMIT_ANIMATION_TICK: Duration = tui::TARGET_FRAME_INTERVAL; #[derive(Debug, Clone)] pub struct AppExitInfo { @@ -1497,7 +1502,7 @@ impl App { let running = self.commit_anim_running.clone(); thread::spawn(move || { while running.load(Ordering::Relaxed) { - thread::sleep(Duration::from_millis(50)); + thread::sleep(COMMIT_ANIMATION_TICK); tx.send(AppEvent::CommitTick); } }); diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index eb2d1ad9655..9135a83b3ae 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -196,6 +196,9 @@ mod skills; use self::skills::collect_tool_mentions; use self::skills::find_app_mentions; use self::skills::find_skill_mentions_with_tool_mentions; +use crate::streaming::chunking::AdaptiveChunkingPolicy; +use crate::streaming::commit_tick::CommitTickScope; +use crate::streaming::commit_tick::run_commit_tick; use crate::streaming::controller::PlanStreamController; use crate::streaming::controller::StreamController; @@ -484,6 +487,7 @@ pub(crate) struct ChatWidget { rate_limit_warnings: RateLimitWarningState, rate_limit_switch_prompt: RateLimitSwitchPromptState, rate_limit_poller: Option>, + adaptive_chunking: AdaptiveChunkingPolicy, // Stream lifecycle controller stream_controller: Option, // Stream lifecycle controller for proposed plan output. @@ -768,6 +772,7 @@ impl ChatWidget { { self.add_boxed_history(cell); } + self.adaptive_chunking.reset(); } /// Update the status indicator header and details. @@ -942,6 +947,7 @@ impl ChatWidget { && controller.push(&delta) { self.app_event_tx.send(AppEvent::StartCommitAnimation); + self.run_catch_up_commit_tick(); } self.request_redraw(); } @@ -1019,6 +1025,7 @@ impl ChatWidget { self.saw_plan_item_this_turn = false; self.plan_delta_buffer.clear(); self.plan_item_active = false; + self.adaptive_chunking.reset(); self.plan_stream_controller = None; self.otel_manager.reset_runtime_metrics(); self.bottom_pane.clear_quit_shortcut_hint(); @@ -1280,7 +1287,9 @@ impl ChatWidget { self.last_unified_wait = None; self.unified_exec_wait_streak = None; self.clear_unified_exec_processes(); + self.adaptive_chunking.reset(); self.stream_controller = None; + self.plan_stream_controller = None; self.maybe_show_pending_rate_limit_prompt(); } @@ -1832,30 +1841,41 @@ impl ChatWidget { self.set_status(message, additional_details); } - /// Periodic tick to commit at most one queued line to history with a small delay, - /// animating the output. + /// Periodic tick for stream commits. In smooth mode this preserves one-line pacing, while + /// catch-up mode drains larger batches to reduce queue lag. pub(crate) fn on_commit_tick(&mut self) { - let mut has_controller = false; - let mut all_idle = true; - if let Some(controller) = self.stream_controller.as_mut() { - has_controller = true; - let (cell, is_idle) = controller.on_commit_tick(); - if let Some(cell) = cell { - self.bottom_pane.hide_status_indicator(); - self.add_boxed_history(cell); - } - all_idle &= is_idle; - } - if let Some(controller) = self.plan_stream_controller.as_mut() { - has_controller = true; - let (cell, is_idle) = controller.on_commit_tick(); - if let Some(cell) = cell { - self.bottom_pane.hide_status_indicator(); - self.add_boxed_history(cell); - } - all_idle &= is_idle; + self.run_commit_tick(); + } + + /// Runs a regular periodic commit tick. + fn run_commit_tick(&mut self) { + self.run_commit_tick_with_scope(CommitTickScope::AnyMode); + } + + /// Runs an opportunistic commit tick only if catch-up mode is active. + fn run_catch_up_commit_tick(&mut self) { + self.run_commit_tick_with_scope(CommitTickScope::CatchUpOnly); + } + + /// Runs a commit tick for the current stream queue snapshot. + /// + /// `scope` controls whether this call may commit in smooth mode or only when catch-up + /// is currently active. + fn run_commit_tick_with_scope(&mut self, scope: CommitTickScope) { + let now = Instant::now(); + let outcome = run_commit_tick( + &mut self.adaptive_chunking, + self.stream_controller.as_mut(), + self.plan_stream_controller.as_mut(), + scope, + now, + ); + for cell in outcome.cells { + self.bottom_pane.hide_status_indicator(); + self.add_boxed_history(cell); } - if has_controller && all_idle { + + if outcome.has_controller && outcome.all_idle { self.app_event_tx.send(AppEvent::StopCommitAnimation); } } @@ -1924,6 +1944,7 @@ impl ChatWidget { && controller.push(&delta) { self.app_event_tx.send(AppEvent::StartCommitAnimation); + self.run_catch_up_commit_tick(); } self.request_redraw(); } @@ -2261,6 +2282,7 @@ impl ChatWidget { rate_limit_warnings: RateLimitWarningState::default(), rate_limit_switch_prompt: RateLimitSwitchPromptState::default(), rate_limit_poller: None, + adaptive_chunking: AdaptiveChunkingPolicy::default(), stream_controller: None, plan_stream_controller: None, running_commands: HashMap::new(), @@ -2406,6 +2428,7 @@ impl ChatWidget { rate_limit_warnings: RateLimitWarningState::default(), rate_limit_switch_prompt: RateLimitSwitchPromptState::default(), rate_limit_poller: None, + adaptive_chunking: AdaptiveChunkingPolicy::default(), stream_controller: None, plan_stream_controller: None, running_commands: HashMap::new(), @@ -2540,6 +2563,7 @@ impl ChatWidget { rate_limit_warnings: RateLimitWarningState::default(), rate_limit_switch_prompt: RateLimitSwitchPromptState::default(), rate_limit_poller: None, + adaptive_chunking: AdaptiveChunkingPolicy::default(), stream_controller: None, plan_stream_controller: None, running_commands: HashMap::new(), diff --git a/codex-rs/tui/src/chatwidget/tests.rs b/codex-rs/tui/src/chatwidget/tests.rs index a70d93a2c40..35654ace3a7 100644 --- a/codex-rs/tui/src/chatwidget/tests.rs +++ b/codex-rs/tui/src/chatwidget/tests.rs @@ -808,6 +808,7 @@ async fn make_chatwidget_manual( rate_limit_warnings: RateLimitWarningState::default(), rate_limit_switch_prompt: RateLimitSwitchPromptState::default(), rate_limit_poller: None, + adaptive_chunking: crate::streaming::chunking::AdaptiveChunkingPolicy::default(), stream_controller: None, plan_stream_controller: None, running_commands: HashMap::new(), diff --git a/codex-rs/tui/src/pager_overlay.rs b/codex-rs/tui/src/pager_overlay.rs index 5f3f79e3205..58f15fa7f36 100644 --- a/codex-rs/tui/src/pager_overlay.rs +++ b/codex-rs/tui/src/pager_overlay.rs @@ -17,7 +17,6 @@ use std::io::Result; use std::sync::Arc; -use std::time::Duration; use crate::chatwidget::ActiveCellTranscriptKey; use crate::history_cell::HistoryCell; @@ -299,7 +298,7 @@ impl PagerView { } } tui.frame_requester() - .schedule_frame_in(Duration::from_millis(16)); + .schedule_frame_in(crate::tui::TARGET_FRAME_INTERVAL); Ok(()) } diff --git a/codex-rs/tui/src/streaming/chunking.rs b/codex-rs/tui/src/streaming/chunking.rs new file mode 100644 index 00000000000..b653f51ecb6 --- /dev/null +++ b/codex-rs/tui/src/streaming/chunking.rs @@ -0,0 +1,439 @@ +//! Adaptive stream chunking policy for commit animation ticks. +//! +//! This policy preserves the baseline user experience while adapting to bursty +//! stream input. In [`ChunkingMode::Smooth`], one queued line is drained per +//! baseline commit tick. When queue pressure rises, it switches to +//! [`ChunkingMode::CatchUp`] and drains queued backlog immediately so display +//! lag converges as quickly as possible. +//! +//! The policy is source-agnostic: it depends only on queue depth and queue +//! age from [`QueueSnapshot`]. It does not branch on source identity or explicit +//! throughput targets. +//! +//! # Mental model +//! +//! Think of this as a two-gear system: +//! +//! - [`ChunkingMode::Smooth`]: steady baseline display pacing. +//! - [`ChunkingMode::CatchUp`]: full queue draining while backlog exists. +//! +//! The transition logic intentionally uses hysteresis: +//! +//! - enter catch-up on higher-pressure thresholds +//! - exit catch-up on lower-pressure thresholds, held for [`EXIT_HOLD`] +//! - after exit, suppress immediate re-entry for [`REENTER_CATCH_UP_HOLD`] +//! unless backlog is severe +//! +//! This avoids rapid gear-flapping near threshold boundaries. +//! +//! # Policy flow +//! +//! On each decision tick, [`AdaptiveChunkingPolicy::decide`] does: +//! +//! 1. If queue is empty, reset to [`ChunkingMode::Smooth`]. +//! 2. If currently smooth, call [`AdaptiveChunkingPolicy::maybe_enter_catch_up`]. +//! 3. If currently catch-up, call [`AdaptiveChunkingPolicy::maybe_exit_catch_up`]. +//! 4. Build [`DrainPlan`] (`Single` for smooth, `Batch(queued_lines)` for catch-up). +//! +//! # Concrete examples +//! +//! With current defaults: +//! +//! - `Smooth` drains one line per commit tick. +//! - `CatchUp` drains all currently queued lines in a tick. +//! +//! # Tuning guide (in code terms) +//! +//! Prefer tuning in this order so causes remain clear: +//! +//! 1. enter/exit thresholds: [`ENTER_QUEUE_DEPTH_LINES`], [`ENTER_OLDEST_AGE`], +//! [`EXIT_QUEUE_DEPTH_LINES`], [`EXIT_OLDEST_AGE`] +//! 2. hysteresis windows: [`EXIT_HOLD`], [`REENTER_CATCH_UP_HOLD`] +//! 3. severe gates: [`SEVERE_QUEUE_DEPTH_LINES`], [`SEVERE_OLDEST_AGE`] +//! +//! Symptom-oriented adjustments: +//! +//! - lag starts too late: lower enter thresholds +//! - frequent smooth/catch-up chatter: increase hold windows, or tighten exit +//! thresholds +//! - catch-up re-enters too eagerly after exit: increase re-entry hold +//! +//! # Responsibilities +//! +//! - track mode and hysteresis state +//! - produce deterministic [`ChunkingDecision`] values from queue snapshots +//! - preserve queue order by draining from queue head only +//! +//! # Non-responsibilities +//! +//! - scheduling commit ticks +//! - reordering stream lines +//! - transport/source-specific semantics +//! +//! Markdown docs remain supplemental: +//! +//! - `docs/tui-stream-chunking-review.md` +//! - `docs/tui-stream-chunking-tuning.md` +//! - `docs/tui-stream-chunking-validation.md` + +use std::time::Duration; +use std::time::Instant; + +/// Queue-depth threshold that allows entering catch-up mode. +/// +/// Crossing this threshold alone is sufficient to leave smooth mode. +const ENTER_QUEUE_DEPTH_LINES: usize = 8; + +/// Oldest-line age threshold that allows entering catch-up mode. +/// +/// Crossing this threshold alone is sufficient to leave smooth mode. +const ENTER_OLDEST_AGE: Duration = Duration::from_millis(120); + +/// Queue-depth threshold used when evaluating catch-up exit hysteresis. +/// +/// Depth must be at or below this value before exit hold timing can begin. +const EXIT_QUEUE_DEPTH_LINES: usize = 2; + +/// Oldest-line age threshold used when evaluating catch-up exit hysteresis. +/// +/// Age must be at or below this value before exit hold timing can begin. +const EXIT_OLDEST_AGE: Duration = Duration::from_millis(40); + +/// Minimum duration queue pressure must stay below exit thresholds to leave catch-up mode. +const EXIT_HOLD: Duration = Duration::from_millis(250); + +/// Cooldown window after a catch-up exit that suppresses immediate re-entry. +/// +/// Severe backlog still bypasses this hold to avoid unbounded queue-age growth. +const REENTER_CATCH_UP_HOLD: Duration = Duration::from_millis(250); + +/// Queue-depth cutoff that marks backlog as severe for faster convergence. +/// +/// This threshold is used to bypass re-entry hold after a recent catch-up exit. +const SEVERE_QUEUE_DEPTH_LINES: usize = 64; + +/// Oldest-line age cutoff that marks backlog as severe for faster convergence. +const SEVERE_OLDEST_AGE: Duration = Duration::from_millis(300); + +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub(crate) enum ChunkingMode { + /// Drain one line per baseline commit tick. + #[default] + Smooth, + /// Drain multiple lines per tick according to queue pressure. + CatchUp, +} + +/// Captures queue pressure inputs used by adaptive chunking decisions. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub(crate) struct QueueSnapshot { + /// Number of queued stream lines waiting to be displayed. + pub(crate) queued_lines: usize, + /// Age of the oldest queued line at decision time. + pub(crate) oldest_age: Option, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) enum DrainPlan { + /// Emit exactly one queued line. + Single, + /// Emit up to `usize` queued lines. + Batch(usize), +} + +/// Represents one policy decision for a specific queue snapshot. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) struct ChunkingDecision { + /// Mode after applying hysteresis transitions for this decision. + pub(crate) mode: ChunkingMode, + /// Whether this decision transitioned from `Smooth` into `CatchUp`. + pub(crate) entered_catch_up: bool, + /// Drain plan to execute for the current commit tick. + pub(crate) drain_plan: DrainPlan, +} + +/// Maintains adaptive chunking mode and hysteresis state across ticks. +#[derive(Debug, Default)] +pub(crate) struct AdaptiveChunkingPolicy { + mode: ChunkingMode, + below_exit_threshold_since: Option, + last_catch_up_exit_at: Option, +} + +impl AdaptiveChunkingPolicy { + /// Returns the policy mode used by the most recent decision. + pub(crate) fn mode(&self) -> ChunkingMode { + self.mode + } + + /// Resets state to baseline smooth mode. + pub(crate) fn reset(&mut self) { + self.mode = ChunkingMode::Smooth; + self.below_exit_threshold_since = None; + self.last_catch_up_exit_at = None; + } + + /// Computes a drain decision from the current queue snapshot. + /// + /// The decision is deterministic for a given `(mode, snapshot, now)` triple. Callers should + /// avoid inventing synthetic snapshots; stale queue age data can cause premature catch-up exits. + pub(crate) fn decide(&mut self, snapshot: QueueSnapshot, now: Instant) -> ChunkingDecision { + if snapshot.queued_lines == 0 { + self.note_catch_up_exit(now); + self.mode = ChunkingMode::Smooth; + self.below_exit_threshold_since = None; + return ChunkingDecision { + mode: self.mode, + entered_catch_up: false, + drain_plan: DrainPlan::Single, + }; + } + + let entered_catch_up = match self.mode { + ChunkingMode::Smooth => self.maybe_enter_catch_up(snapshot, now), + ChunkingMode::CatchUp => { + self.maybe_exit_catch_up(snapshot, now); + false + } + }; + + let drain_plan = match self.mode { + ChunkingMode::Smooth => DrainPlan::Single, + ChunkingMode::CatchUp => DrainPlan::Batch(snapshot.queued_lines.max(1)), + }; + + ChunkingDecision { + mode: self.mode, + entered_catch_up, + drain_plan, + } + } + + /// Switches from `Smooth` to `CatchUp` when enter thresholds are crossed. + /// + /// Returns `true` only on the transition tick so callers can emit one-shot + /// transition observability. + fn maybe_enter_catch_up(&mut self, snapshot: QueueSnapshot, now: Instant) -> bool { + if !should_enter_catch_up(snapshot) { + return false; + } + if self.reentry_hold_active(now) && !is_severe_backlog(snapshot) { + return false; + } + self.mode = ChunkingMode::CatchUp; + self.below_exit_threshold_since = None; + self.last_catch_up_exit_at = None; + true + } + + /// Applies exit hysteresis while in `CatchUp` mode. + /// + /// The policy requires queue pressure to stay below exit thresholds for the + /// full `EXIT_HOLD` window before returning to `Smooth`. + fn maybe_exit_catch_up(&mut self, snapshot: QueueSnapshot, now: Instant) { + if !should_exit_catch_up(snapshot) { + self.below_exit_threshold_since = None; + return; + } + + match self.below_exit_threshold_since { + Some(since) if now.saturating_duration_since(since) >= EXIT_HOLD => { + self.mode = ChunkingMode::Smooth; + self.below_exit_threshold_since = None; + self.last_catch_up_exit_at = Some(now); + } + Some(_) => {} + None => { + self.below_exit_threshold_since = Some(now); + } + } + } + + fn note_catch_up_exit(&mut self, now: Instant) { + if self.mode == ChunkingMode::CatchUp { + self.last_catch_up_exit_at = Some(now); + } + } + + fn reentry_hold_active(&self, now: Instant) -> bool { + self.last_catch_up_exit_at + .is_some_and(|exit| now.saturating_duration_since(exit) < REENTER_CATCH_UP_HOLD) + } +} + +/// Returns whether current queue pressure warrants entering catch-up mode. +/// +/// Either depth or age pressure is sufficient to trigger catch-up. +fn should_enter_catch_up(snapshot: QueueSnapshot) -> bool { + snapshot.queued_lines >= ENTER_QUEUE_DEPTH_LINES + || snapshot + .oldest_age + .is_some_and(|oldest| oldest >= ENTER_OLDEST_AGE) +} + +/// Returns whether queue pressure is low enough to begin exit hysteresis. +/// +/// Both depth and age must be below thresholds; this prevents oscillation when +/// one signal is still under load. +fn should_exit_catch_up(snapshot: QueueSnapshot) -> bool { + snapshot.queued_lines <= EXIT_QUEUE_DEPTH_LINES + && snapshot + .oldest_age + .is_some_and(|oldest| oldest <= EXIT_OLDEST_AGE) +} + +/// Returns whether backlog is severe enough to use a faster catch-up target. +/// +/// Severe pressure bypasses re-entry hold to avoid queue-age growth after a +/// recent catch-up exit. +fn is_severe_backlog(snapshot: QueueSnapshot) -> bool { + snapshot.queued_lines >= SEVERE_QUEUE_DEPTH_LINES + || snapshot + .oldest_age + .is_some_and(|oldest| oldest >= SEVERE_OLDEST_AGE) +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + fn snapshot(queued_lines: usize, oldest_age_ms: u64) -> QueueSnapshot { + QueueSnapshot { + queued_lines, + oldest_age: Some(Duration::from_millis(oldest_age_ms)), + } + } + + #[test] + fn smooth_mode_is_default() { + let mut policy = AdaptiveChunkingPolicy::default(); + let now = Instant::now(); + + let decision = policy.decide(snapshot(1, 10), now); + assert_eq!(decision.mode, ChunkingMode::Smooth); + assert_eq!(decision.entered_catch_up, false); + assert_eq!(decision.drain_plan, DrainPlan::Single); + } + + #[test] + fn enters_catch_up_on_depth_threshold() { + let mut policy = AdaptiveChunkingPolicy::default(); + let now = Instant::now(); + + let decision = policy.decide(snapshot(8, 10), now); + assert_eq!(decision.mode, ChunkingMode::CatchUp); + assert_eq!(decision.entered_catch_up, true); + assert_eq!(decision.drain_plan, DrainPlan::Batch(8)); + } + + #[test] + fn enters_catch_up_on_age_threshold() { + let mut policy = AdaptiveChunkingPolicy::default(); + let now = Instant::now(); + + let decision = policy.decide(snapshot(2, 120), now); + assert_eq!(decision.mode, ChunkingMode::CatchUp); + assert_eq!(decision.entered_catch_up, true); + assert_eq!(decision.drain_plan, DrainPlan::Batch(2)); + } + + #[test] + fn severe_backlog_uses_faster_paced_batches() { + let mut policy = AdaptiveChunkingPolicy::default(); + let now = Instant::now(); + let _ = policy.decide(snapshot(9, 10), now); + + let decision = policy.decide(snapshot(64, 10), now + Duration::from_millis(5)); + assert_eq!(decision.mode, ChunkingMode::CatchUp); + assert_eq!(decision.drain_plan, DrainPlan::Batch(64)); + } + + #[test] + fn catch_up_batch_drains_current_backlog() { + let mut policy = AdaptiveChunkingPolicy::default(); + let now = Instant::now(); + let decision = policy.decide(snapshot(512, 400), now); + assert_eq!(decision.mode, ChunkingMode::CatchUp); + assert_eq!(decision.drain_plan, DrainPlan::Batch(512)); + } + + #[test] + fn exits_catch_up_after_hysteresis_hold() { + let mut policy = AdaptiveChunkingPolicy::default(); + let t0 = Instant::now(); + + let _ = policy.decide(snapshot(9, 10), t0); + assert_eq!(policy.mode(), ChunkingMode::CatchUp); + + let pre_hold = policy.decide(snapshot(2, 40), t0 + Duration::from_millis(200)); + assert_eq!(pre_hold.mode, ChunkingMode::CatchUp); + + let post_hold = policy.decide(snapshot(2, 40), t0 + Duration::from_millis(460)); + assert_eq!(post_hold.mode, ChunkingMode::Smooth); + assert_eq!(post_hold.drain_plan, DrainPlan::Single); + } + + #[test] + fn drops_back_to_smooth_when_idle() { + let mut policy = AdaptiveChunkingPolicy::default(); + let now = Instant::now(); + let _ = policy.decide(snapshot(9, 10), now); + assert_eq!(policy.mode(), ChunkingMode::CatchUp); + + let decision = policy.decide( + QueueSnapshot { + queued_lines: 0, + oldest_age: None, + }, + now + Duration::from_millis(20), + ); + assert_eq!(decision.mode, ChunkingMode::Smooth); + assert_eq!(decision.drain_plan, DrainPlan::Single); + } + + #[test] + fn holds_reentry_after_catch_up_exit() { + let mut policy = AdaptiveChunkingPolicy::default(); + let t0 = Instant::now(); + + let entered = policy.decide(snapshot(8, 20), t0); + assert_eq!(entered.mode, ChunkingMode::CatchUp); + + let drained = policy.decide( + QueueSnapshot { + queued_lines: 0, + oldest_age: None, + }, + t0 + Duration::from_millis(20), + ); + assert_eq!(drained.mode, ChunkingMode::Smooth); + + let held = policy.decide(snapshot(8, 20), t0 + Duration::from_millis(120)); + assert_eq!(held.mode, ChunkingMode::Smooth); + assert_eq!(held.drain_plan, DrainPlan::Single); + + let reentered = policy.decide(snapshot(8, 20), t0 + Duration::from_millis(320)); + assert_eq!(reentered.mode, ChunkingMode::CatchUp); + assert_eq!(reentered.drain_plan, DrainPlan::Batch(8)); + } + + #[test] + fn severe_backlog_can_reenter_during_hold() { + let mut policy = AdaptiveChunkingPolicy::default(); + let t0 = Instant::now(); + + let _ = policy.decide(snapshot(8, 20), t0); + let _ = policy.decide( + QueueSnapshot { + queued_lines: 0, + oldest_age: None, + }, + t0 + Duration::from_millis(20), + ); + + let severe = policy.decide(snapshot(64, 20), t0 + Duration::from_millis(120)); + assert_eq!(severe.mode, ChunkingMode::CatchUp); + assert_eq!(severe.drain_plan, DrainPlan::Batch(64)); + } +} diff --git a/codex-rs/tui/src/streaming/commit_tick.rs b/codex-rs/tui/src/streaming/commit_tick.rs new file mode 100644 index 00000000000..bda63bccf63 --- /dev/null +++ b/codex-rs/tui/src/streaming/commit_tick.rs @@ -0,0 +1,224 @@ +//! Orchestrates commit-tick drains across streaming controllers. +//! +//! This module bridges queue-based chunking policy (`chunking`) with the concrete stream +//! controllers (`controller`). Callers provide the current controllers and tick scope; the module +//! computes queue pressure, selects a drain plan, applies it, and returns emitted history cells. +//! +//! The module preserves ordering by draining only from controller queue heads. It does not schedule +//! ticks and it does not mutate UI state directly; callers remain responsible for animation events +//! and history insertion side effects. +//! +//! The main flow is: +//! [`run_commit_tick`] -> [`stream_queue_snapshot`] -> [`QueueSnapshot`] -> +//! [`resolve_chunking_plan`] -> [`ChunkingDecision`]/[`DrainPlan`] -> +//! [`apply_commit_tick_plan`] -> [`CommitTickOutput`]. + +use std::time::Duration; +use std::time::Instant; + +use crate::history_cell::HistoryCell; + +use super::chunking::AdaptiveChunkingPolicy; +use super::chunking::ChunkingDecision; +use super::chunking::ChunkingMode; +use super::chunking::DrainPlan; +use super::chunking::QueueSnapshot; +use super::controller::PlanStreamController; +use super::controller::StreamController; + +/// Describes whether a commit tick may run in all modes or only in catch-up mode. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) enum CommitTickScope { + /// Always run the tick, regardless of current chunking mode. + AnyMode, + /// Run model transitions and policy updates, but commit lines only in `CatchUp`. + CatchUpOnly, +} + +/// Describes what a single commit tick produced. +pub(crate) struct CommitTickOutput { + /// Cells produced by drained stream lines during this tick. + pub(crate) cells: Vec>, + /// Whether at least one stream controller was present for this tick. + pub(crate) has_controller: bool, + /// Whether all present controllers were idle after this tick. + pub(crate) all_idle: bool, +} + +impl Default for CommitTickOutput { + /// Creates an output that represents "no commit performed". + /// + /// This is used when a tick is intentionally suppressed, for example when + /// the scope is [`CommitTickScope::CatchUpOnly`] and policy is not in catch-up mode. + fn default() -> Self { + Self { + cells: Vec::new(), + has_controller: false, + all_idle: true, + } + } +} + +/// Runs one commit tick against the provided stream controllers. +/// +/// This function collects a [`QueueSnapshot`], asks [`AdaptiveChunkingPolicy`] for a +/// [`ChunkingDecision`], and then applies the resulting [`DrainPlan`] to both controllers. +/// If callers pass stale controller references (for example, references not tied to the +/// current turn), queue age can be misread and the policy may stay in catch-up longer +/// than expected. +pub(crate) fn run_commit_tick( + policy: &mut AdaptiveChunkingPolicy, + stream_controller: Option<&mut StreamController>, + plan_stream_controller: Option<&mut PlanStreamController>, + scope: CommitTickScope, + now: Instant, +) -> CommitTickOutput { + let snapshot = stream_queue_snapshot( + stream_controller.as_deref(), + plan_stream_controller.as_deref(), + now, + ); + let decision = resolve_chunking_plan(policy, snapshot, now); + if scope == CommitTickScope::CatchUpOnly && decision.mode != ChunkingMode::CatchUp { + return CommitTickOutput::default(); + } + + let output = apply_commit_tick_plan( + decision.drain_plan, + stream_controller, + plan_stream_controller, + ); + tracing::trace!( + mode = ?decision.mode, + queued_lines = snapshot.queued_lines, + oldest_queued_age_ms = snapshot.oldest_age.map(|age| age.as_millis() as u64), + drain_plan = ?decision.drain_plan, + has_controller = output.has_controller, + all_idle = output.all_idle, + "stream chunking commit tick" + ); + output +} + +/// Builds the combined queue-pressure snapshot consumed by chunking policy. +/// +/// The snapshot sums queue depth across controllers and keeps the maximum oldest age +/// so policy decisions reflect the most delayed queued line currently visible. +fn stream_queue_snapshot( + stream_controller: Option<&StreamController>, + plan_stream_controller: Option<&PlanStreamController>, + now: Instant, +) -> QueueSnapshot { + let mut queued_lines = 0usize; + let mut oldest_age: Option = None; + + if let Some(controller) = stream_controller { + queued_lines += controller.queued_lines(); + oldest_age = max_duration(oldest_age, controller.oldest_queued_age(now)); + } + if let Some(controller) = plan_stream_controller { + queued_lines += controller.queued_lines(); + oldest_age = max_duration(oldest_age, controller.oldest_queued_age(now)); + } + + QueueSnapshot { + queued_lines, + oldest_age, + } +} + +/// Computes one policy decision and emits a trace log on mode transitions. +/// +/// This keeps policy transition logging in one place so callers can rely on +/// [`run_commit_tick`] to provide consistent observability. +fn resolve_chunking_plan( + policy: &mut AdaptiveChunkingPolicy, + snapshot: QueueSnapshot, + now: Instant, +) -> ChunkingDecision { + let prior_mode = policy.mode(); + let decision = policy.decide(snapshot, now); + if decision.mode != prior_mode { + tracing::trace!( + prior_mode = ?prior_mode, + new_mode = ?decision.mode, + queued_lines = snapshot.queued_lines, + oldest_queued_age_ms = snapshot.oldest_age.map(|age| age.as_millis() as u64), + entered_catch_up = decision.entered_catch_up, + "stream chunking mode transition" + ); + } + decision +} + +/// Applies a [`DrainPlan`] to all available stream controllers. +/// +/// The returned [`CommitTickOutput`] reports emitted cells and whether all +/// present controllers are idle after draining. +fn apply_commit_tick_plan( + drain_plan: DrainPlan, + stream_controller: Option<&mut StreamController>, + plan_stream_controller: Option<&mut PlanStreamController>, +) -> CommitTickOutput { + let mut output = CommitTickOutput::default(); + + if let Some(controller) = stream_controller { + output.has_controller = true; + let (cell, is_idle) = drain_stream_controller(controller, drain_plan); + if let Some(cell) = cell { + output.cells.push(cell); + } + output.all_idle &= is_idle; + } + if let Some(controller) = plan_stream_controller { + output.has_controller = true; + let (cell, is_idle) = drain_plan_stream_controller(controller, drain_plan); + if let Some(cell) = cell { + output.cells.push(cell); + } + output.all_idle &= is_idle; + } + + output +} + +/// Applies one drain step to the main stream controller. +/// +/// [`DrainPlan::Single`] maps to one-line drain; [`DrainPlan::Batch`] maps to +/// multi-line drain (including instant catch-up when policy requests the full +/// queued backlog). +fn drain_stream_controller( + controller: &mut StreamController, + drain_plan: DrainPlan, +) -> (Option>, bool) { + match drain_plan { + DrainPlan::Single => controller.on_commit_tick(), + DrainPlan::Batch(max_lines) => controller.on_commit_tick_batch(max_lines), + } +} + +/// Applies one drain step to the plan stream controller. +/// +/// This mirrors [`drain_stream_controller`] so both controller types follow the +/// same chunking policy decisions. +fn drain_plan_stream_controller( + controller: &mut PlanStreamController, + drain_plan: DrainPlan, +) -> (Option>, bool) { + match drain_plan { + DrainPlan::Single => controller.on_commit_tick(), + DrainPlan::Batch(max_lines) => controller.on_commit_tick_batch(max_lines), + } +} + +/// Returns the greater of two optional durations. +/// +/// This helper preserves whichever side is present when only one duration exists. +fn max_duration(lhs: Option, rhs: Option) -> Option { + match (lhs, rhs) { + (Some(left), Some(right)) => Some(left.max(right)), + (Some(left), None) => Some(left), + (None, Some(right)) => Some(right), + (None, None) => None, + } +} diff --git a/codex-rs/tui/src/streaming/controller.rs b/codex-rs/tui/src/streaming/controller.rs index 462962e980b..6117485adf3 100644 --- a/codex-rs/tui/src/streaming/controller.rs +++ b/codex-rs/tui/src/streaming/controller.rs @@ -4,6 +4,8 @@ use crate::render::line_utils::prefix_lines; use crate::style::proposed_plan_style; use ratatui::prelude::Stylize; use ratatui::text::Line; +use std::time::Duration; +use std::time::Instant; use super::StreamState; @@ -71,6 +73,28 @@ impl StreamController { (self.emit(step), self.state.is_idle()) } + /// Step animation: commit at most `max_lines` queued lines. + /// + /// This is intended for adaptive catch-up drains. Callers should keep `max_lines` bounded; a + /// very large value can collapse perceived animation into a single jump. + pub(crate) fn on_commit_tick_batch( + &mut self, + max_lines: usize, + ) -> (Option>, bool) { + let step = self.state.drain_n(max_lines.max(1)); + (self.emit(step), self.state.is_idle()) + } + + /// Returns the current number of queued lines waiting to be displayed. + pub(crate) fn queued_lines(&self) -> usize { + self.state.queued_len() + } + + /// Returns the age of the oldest queued line. + pub(crate) fn oldest_queued_age(&self, now: Instant) -> Option { + self.state.oldest_queued_age(now) + } + fn emit(&mut self, lines: Vec>) -> Option> { if lines.is_empty() { return None; @@ -142,6 +166,28 @@ impl PlanStreamController { (self.emit(step, false), self.state.is_idle()) } + /// Step animation: commit at most `max_lines` queued lines. + /// + /// This is intended for adaptive catch-up drains. Callers should keep `max_lines` bounded; a + /// very large value can collapse perceived animation into a single jump. + pub(crate) fn on_commit_tick_batch( + &mut self, + max_lines: usize, + ) -> (Option>, bool) { + let step = self.state.drain_n(max_lines.max(1)); + (self.emit(step, false), self.state.is_idle()) + } + + /// Returns the current number of queued plan lines waiting to be displayed. + pub(crate) fn queued_lines(&self) -> usize { + self.state.queued_len() + } + + /// Returns the age of the oldest queued plan line. + pub(crate) fn oldest_queued_age(&self, now: Instant) -> Option { + self.state.oldest_queued_age(now) + } + fn emit( &mut self, lines: Vec>, diff --git a/codex-rs/tui/src/streaming/mod.rs b/codex-rs/tui/src/streaming/mod.rs index fa00702834c..c783f27ae95 100644 --- a/codex-rs/tui/src/streaming/mod.rs +++ b/codex-rs/tui/src/streaming/mod.rs @@ -1,17 +1,39 @@ +//! Streaming primitives used by the TUI transcript pipeline. +//! +//! `StreamState` owns newline-gated markdown collection and a FIFO queue of committed render lines. +//! Higher-level modules build on top of this state: +//! - `controller` adapts queued lines into `HistoryCell` emission rules for message and plan streams. +//! - `chunking` computes adaptive drain plans from queue pressure. +//! - `commit_tick` binds policy decisions to concrete controller drains. +//! +//! The key invariant is queue ordering. All drains pop from the front, and enqueue records an +//! arrival timestamp so policy code can reason about oldest queued age without peeking into text. + use std::collections::VecDeque; +use std::time::Duration; +use std::time::Instant; use ratatui::text::Line; use crate::markdown_stream::MarkdownStreamCollector; +pub(crate) mod chunking; +pub(crate) mod commit_tick; pub(crate) mod controller; +struct QueuedLine { + line: Line<'static>, + enqueued_at: Instant, +} + +/// Holds in-flight markdown stream state and queued committed lines. pub(crate) struct StreamState { pub(crate) collector: MarkdownStreamCollector, - queued_lines: VecDeque>, + queued_lines: VecDeque, pub(crate) has_seen_delta: bool, } impl StreamState { + /// Creates an empty stream state with an optional target wrap width. pub(crate) fn new(width: Option) -> Self { Self { collector: MarkdownStreamCollector::new(width), @@ -19,21 +41,75 @@ impl StreamState { has_seen_delta: false, } } + /// Resets collector and queue state for the next stream lifecycle. pub(crate) fn clear(&mut self) { self.collector.clear(); self.queued_lines.clear(); self.has_seen_delta = false; } + /// Drains one queued line from the front of the queue. pub(crate) fn step(&mut self) -> Vec> { - self.queued_lines.pop_front().into_iter().collect() + self.queued_lines + .pop_front() + .map(|queued| queued.line) + .into_iter() + .collect() } + /// Drains up to `max_lines` queued lines from the front of the queue. + /// + /// Callers that pass very large values still get bounded behavior because this method clamps to + /// the currently available queue length. + pub(crate) fn drain_n(&mut self, max_lines: usize) -> Vec> { + let end = max_lines.min(self.queued_lines.len()); + self.queued_lines + .drain(..end) + .map(|queued| queued.line) + .collect() + } + /// Drains all queued lines from the front of the queue. pub(crate) fn drain_all(&mut self) -> Vec> { - self.queued_lines.drain(..).collect() + self.queued_lines + .drain(..) + .map(|queued| queued.line) + .collect() } + /// Returns whether no lines are queued for commit. pub(crate) fn is_idle(&self) -> bool { self.queued_lines.is_empty() } + /// Returns the current queue depth. + pub(crate) fn queued_len(&self) -> usize { + self.queued_lines.len() + } + /// Returns the age of the oldest queued line. + pub(crate) fn oldest_queued_age(&self, now: Instant) -> Option { + self.queued_lines + .front() + .map(|queued| now.saturating_duration_since(queued.enqueued_at)) + } + /// Appends committed lines to the queue with a shared enqueue timestamp. pub(crate) fn enqueue(&mut self, lines: Vec>) { - self.queued_lines.extend(lines); + let now = Instant::now(); + self.queued_lines + .extend(lines.into_iter().map(|line| QueuedLine { + line, + enqueued_at: now, + })); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn drain_n_clamps_to_available_lines() { + let mut state = StreamState::new(None); + state.enqueue(vec![Line::from("one")]); + + let drained = state.drain_n(8); + assert_eq!(drained, vec![Line::from("one")]); + assert!(state.is_idle()); } } diff --git a/codex-rs/tui/src/tui.rs b/codex-rs/tui/src/tui.rs index 761fa836273..a77785c1b8a 100644 --- a/codex-rs/tui/src/tui.rs +++ b/codex-rs/tui/src/tui.rs @@ -10,6 +10,7 @@ use std::pin::Pin; use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; +use std::time::Duration; use crossterm::Command; use crossterm::SynchronizedUpdate; @@ -52,6 +53,9 @@ mod frame_requester; #[cfg(unix)] mod job_control; +/// Target frame interval for UI redraw scheduling. +pub(crate) const TARGET_FRAME_INTERVAL: Duration = frame_rate_limiter::MIN_FRAME_INTERVAL; + /// A type alias for the terminal type used in this application pub type Terminal = CustomTerminal>; diff --git a/codex-rs/tui/src/tui/frame_rate_limiter.rs b/codex-rs/tui/src/tui/frame_rate_limiter.rs index 56dd752e643..6f4ad8e7479 100644 --- a/codex-rs/tui/src/tui/frame_rate_limiter.rs +++ b/codex-rs/tui/src/tui/frame_rate_limiter.rs @@ -1,7 +1,7 @@ //! Limits how frequently frame draw notifications may be emitted. //! //! Widgets sometimes call `FrameRequester::schedule_frame()` more frequently than a user can -//! perceive. This limiter clamps draw notifications to a maximum of 60 FPS to avoid wasted work. +//! perceive. This limiter clamps draw notifications to a maximum of 120 FPS to avoid wasted work. //! //! This is intentionally a small, pure helper so it can be unit-tested in isolation and used by //! the async frame scheduler without adding complexity to the app/event loop. @@ -9,8 +9,8 @@ use std::time::Duration; use std::time::Instant; -/// A 60 FPS minimum frame interval (≈16.67ms). -pub(super) const MIN_FRAME_INTERVAL: Duration = Duration::from_nanos(16_666_667); +/// A 120 FPS minimum frame interval (≈8.33ms). +pub(super) const MIN_FRAME_INTERVAL: Duration = Duration::from_nanos(8_333_334); /// Remembers the most recent emitted draw, allowing deadlines to be clamped forward. #[derive(Debug, Default)] diff --git a/codex-rs/tui/src/tui/frame_requester.rs b/codex-rs/tui/src/tui/frame_requester.rs index 88db9d71f3c..d7e54d82cc9 100644 --- a/codex-rs/tui/src/tui/frame_requester.rs +++ b/codex-rs/tui/src/tui/frame_requester.rs @@ -71,7 +71,7 @@ impl FrameRequester { /// /// This type is internal to `FrameRequester` and is spawned as a task to handle scheduling logic. /// -/// To avoid wasted redraw work, draw notifications are clamped to a maximum of 60 FPS (see +/// To avoid wasted redraw work, draw notifications are clamped to a maximum of 120 FPS (see /// [`FrameRateLimiter`]). struct FrameScheduler { receiver: mpsc::UnboundedReceiver, @@ -232,7 +232,7 @@ mod tests { } #[tokio::test(flavor = "current_thread", start_paused = true)] - async fn test_limits_draw_notifications_to_60fps() { + async fn test_limits_draw_notifications_to_120fps() { let (draw_tx, mut draw_rx) = broadcast::channel(16); let requester = FrameRequester::new(draw_tx); @@ -250,7 +250,7 @@ mod tests { let early = draw_rx.recv().timeout(Duration::from_millis(1)).await; assert!( early.is_err(), - "draw fired too early; expected max 60fps (min interval {MIN_FRAME_INTERVAL:?})" + "draw fired too early; expected max 120fps (min interval {MIN_FRAME_INTERVAL:?})" ); time::advance(MIN_FRAME_INTERVAL).await; @@ -278,11 +278,11 @@ mod tests { requester.schedule_frame_in(Duration::from_millis(1)); - time::advance(Duration::from_millis(10)).await; + time::advance(MIN_FRAME_INTERVAL / 2).await; let too_early = draw_rx.recv().timeout(Duration::from_millis(1)).await; assert!( too_early.is_err(), - "draw fired too early; expected max 60fps (min interval {MIN_FRAME_INTERVAL:?})" + "draw fired too early; expected max 120fps (min interval {MIN_FRAME_INTERVAL:?})" ); time::advance(MIN_FRAME_INTERVAL).await; diff --git a/docs/tui-stream-chunking-review.md b/docs/tui-stream-chunking-review.md new file mode 100644 index 00000000000..3722492ddf8 --- /dev/null +++ b/docs/tui-stream-chunking-review.md @@ -0,0 +1,124 @@ +# TUI Stream Chunking + +This document explains how stream chunking in the TUI works and why it is +implemented this way. + +## Problem + +Streaming output can arrive faster than a one-line-per-tick animation can show +it. If commit speed stays fixed while arrival speed spikes, queued lines grow +and visible output lags behind received output. + +## Design goals + +- Preserve existing baseline behavior under normal load. +- Reduce display lag when backlog builds. +- Keep output order stable. +- Avoid abrupt single-frame flushes that look jumpy. +- Keep policy transport-agnostic and based only on queue state. + +## Non-goals + +- The policy does not schedule animation ticks. +- The policy does not depend on upstream source identity. +- The policy does not reorder queued output. + +## Where the logic lives + +- `codex-rs/tui/src/streaming/chunking.rs` + - Adaptive policy, mode transitions, and drain-plan selection. +- `codex-rs/tui/src/streaming/commit_tick.rs` + - Orchestration for each commit tick: snapshot, decide, drain, trace. +- `codex-rs/tui/src/streaming/controller.rs` + - Queue/drain primitives used by commit-tick orchestration. +- `codex-rs/tui/src/chatwidget.rs` + - Integration point that invokes commit-tick orchestration and handles UI + lifecycle events. + +## Runtime flow + +On each commit tick: + +1. Build a queue snapshot across active controllers. + - `queued_lines`: total queued lines. + - `oldest_age`: max age of the oldest queued line across controllers. +2. Ask adaptive policy for a decision. + - Output: current mode and a drain plan. +3. Apply drain plan to each controller. +4. Emit drained `HistoryCell`s for insertion by the caller. +5. Emit trace logs for observability. + +In `CatchUpOnly` scope, policy state still advances, but draining is skipped +unless mode is currently `CatchUp`. + +## Modes and transitions + +Two modes are used: + +- `Smooth` + - Baseline behavior: one line drained per baseline commit tick. + - Baseline tick interval currently comes from + `tui/src/app.rs:COMMIT_ANIMATION_TICK` (~8.3ms, ~120fps). +- `CatchUp` + - Drain current queued backlog per tick via `Batch(queued_lines)`. + +Entry and exit use hysteresis: + +- Enter `CatchUp` when queue depth or queue age exceeds enter thresholds. +- Exit requires both depth and age to be below exit thresholds for a hold + window (`EXIT_HOLD`). + +This prevents oscillation when load hovers near thresholds. + +## Current experimental tuning values + +These are the current values in `streaming/chunking.rs` plus the baseline +commit tick in `tui/src/app.rs`. They are +experimental and may change as we gather more trace data. + +- Baseline commit tick: `~8.3ms` (`COMMIT_ANIMATION_TICK` in `app.rs`) +- Enter catch-up: + - `queued_lines >= 8` OR `oldest_age >= 120ms` +- Exit catch-up eligibility: + - `queued_lines <= 2` AND `oldest_age <= 40ms` +- Exit hold (`CatchUp -> Smooth`): `250ms` +- Re-entry hold after catch-up exit: `250ms` +- Severe backlog thresholds: + - `queued_lines >= 64` OR `oldest_age >= 300ms` + +## Drain planning + +In `Smooth`, plan is always `Single`. + +In `CatchUp`, plan is `Batch(queued_lines)`, which drains the currently queued +backlog for immediate convergence. + +## Why this design + +This keeps normal animation semantics intact, while making backlog behavior +adaptive: + +- Under normal load, behavior stays familiar and stable. +- Under pressure, queue age is reduced quickly without sacrificing ordering. +- Hysteresis avoids rapid mode flapping. + +## Invariants + +- Queue order is preserved. +- Empty queue resets policy back to `Smooth`. +- `CatchUp` exits only after sustained low pressure. +- Catch-up drains are immediate while in `CatchUp`. + +## Observability + +Trace events are emitted from commit-tick orchestration: + +- `stream chunking commit tick` + - `mode`, `queued_lines`, `oldest_queued_age_ms`, `drain_plan`, + `has_controller`, `all_idle` +- `stream chunking mode transition` + - `prior_mode`, `new_mode`, `queued_lines`, `oldest_queued_age_ms`, + `entered_catch_up` + +These events are intended to explain display lag by showing queue pressure, +selected drain behavior, and mode transitions over time. diff --git a/docs/tui-stream-chunking-tuning.md b/docs/tui-stream-chunking-tuning.md new file mode 100644 index 00000000000..d9a2ea5e213 --- /dev/null +++ b/docs/tui-stream-chunking-tuning.md @@ -0,0 +1,98 @@ +# TUI Stream Chunking Tuning Guide + +This document explains how to tune adaptive stream chunking constants without +changing the underlying policy shape. + +## Scope + +Use this guide when adjusting queue-pressure thresholds and hysteresis windows in +`codex-rs/tui/src/streaming/chunking.rs`, and baseline commit cadence in +`codex-rs/tui/src/app.rs`. + +This guide is about tuning behavior, not redesigning the policy. + +## Before tuning + +- Keep the baseline behavior intact: + - `Smooth` mode drains one line per baseline tick. + - `CatchUp` mode drains queued backlog immediately. +- Capture trace logs with: + - `codex_tui::streaming::commit_tick` +- Evaluate on sustained, bursty, and mixed-output prompts. + +See `docs/tui-stream-chunking-validation.md` for the measurement process. + +## Tuning goals + +Tune for all three goals together: + +- low visible lag under bursty output +- low mode flapping (`Smooth <-> CatchUp` chatter) +- stable catch-up entry/exit behavior under mixed workloads + +## Constants and what they control + +### Baseline commit cadence + +- `COMMIT_ANIMATION_TICK` (`tui/src/app.rs`) + - Lower values increase smooth-mode update cadence and reduce steady-state lag. + - Higher values increase smoothing and can increase perceived lag. + - This should usually move after chunking thresholds/holds are in a good range. + +### Enter/exit thresholds + +- `ENTER_QUEUE_DEPTH_LINES`, `ENTER_OLDEST_AGE` + - Lower values enter catch-up earlier (less lag, more mode switching risk). + - Higher values enter later (more lag tolerance, fewer mode switches). +- `EXIT_QUEUE_DEPTH_LINES`, `EXIT_OLDEST_AGE` + - Lower values keep catch-up active longer. + - Higher values allow earlier exit and may increase re-entry churn. + +### Hysteresis holds + +- `EXIT_HOLD` + - Longer hold reduces flip-flop exits when pressure is noisy. + - Too long can keep catch-up active after pressure has cleared. +- `REENTER_CATCH_UP_HOLD` + - Longer hold suppresses rapid re-entry after exit. + - Too long can delay needed catch-up for near-term bursts. + - Severe backlog bypasses this hold by design. + +### Severe-backlog gates + +- `SEVERE_QUEUE_DEPTH_LINES`, `SEVERE_OLDEST_AGE` + - Lower values bypass re-entry hold earlier. + - Higher values reserve hold bypass for only extreme pressure. + +## Recommended tuning order + +Tune in this order to keep cause/effect clear: + +1. Entry/exit thresholds (`ENTER_*`, `EXIT_*`) +2. Hold windows (`EXIT_HOLD`, `REENTER_CATCH_UP_HOLD`) +3. Severe gates (`SEVERE_*`) +4. Baseline cadence (`COMMIT_ANIMATION_TICK`) + +Change one logical group at a time and re-measure before the next group. + +## Symptom-driven adjustments + +- Too much lag before catch-up starts: + - lower `ENTER_QUEUE_DEPTH_LINES` and/or `ENTER_OLDEST_AGE` +- Frequent `Smooth -> CatchUp -> Smooth` chatter: + - increase `EXIT_HOLD` + - increase `REENTER_CATCH_UP_HOLD` + - tighten exit thresholds (lower `EXIT_*`) +- Catch-up engages too often for short bursts: + - increase `ENTER_QUEUE_DEPTH_LINES` and/or `ENTER_OLDEST_AGE` + - increase `REENTER_CATCH_UP_HOLD` +- Catch-up engages too late: + - lower `ENTER_QUEUE_DEPTH_LINES` and/or `ENTER_OLDEST_AGE` + - lower severe gates (`SEVERE_*`) to bypass re-entry hold sooner + +## Validation checklist after each tuning pass + +- `cargo test -p codex-tui` passes. +- Trace window shows bounded queue-age behavior. +- Mode transitions are not concentrated in repeated short-interval cycles. +- Catch-up clears backlog quickly once mode enters `CatchUp`. diff --git a/docs/tui-stream-chunking-validation.md b/docs/tui-stream-chunking-validation.md new file mode 100644 index 00000000000..26bd949da60 --- /dev/null +++ b/docs/tui-stream-chunking-validation.md @@ -0,0 +1,105 @@ +# TUI Stream Chunking Validation Process + +This document records the process used to validate adaptive stream chunking +and anti-flap behavior. + +## Scope + +The goal is to verify two properties from runtime traces: + +- display lag is reduced when queue pressure rises +- mode transitions remain stable instead of rapidly flapping + +## Trace targets + +Chunking observability is emitted by: + +- `codex_tui::streaming::commit_tick` + +Two trace messages are used: + +- `stream chunking commit tick` +- `stream chunking mode transition` + +## Runtime command + +Run Codex with chunking traces enabled: + +```bash +RUST_LOG='codex_tui::streaming::commit_tick=trace,codex_tui=info,codex_core=info,codex_rmcp_client=info' \ + just codex --enable=responses_websockets +``` + +## Log capture process + +1. Record the current size of `~/.codex/log/codex-tui.log` as a start offset. +2. Run an interactive prompt that produces sustained streamed output. +3. Stop the run. +4. Parse only log bytes written after the recorded offset. + +This avoids mixing earlier sessions with the current measurement window. + +## Metrics reviewed + +For each measured window: + +- `commit_ticks` +- `mode_transitions` +- `smooth_ticks` +- `catchup_ticks` +- drain-plan distribution (`Single`, `Batch(n)`) +- queue depth (`max`, `p95`, `p99`) +- oldest queued age (`max`, `p95`, `p99`) +- rapid re-entry count: + - number of `Smooth -> CatchUp` transitions within 1 second of a + `CatchUp -> Smooth` transition + +## Interpretation + +- Healthy behavior: + - queue age remains bounded while backlog is drained + - transition count is low relative to total ticks + - rapid re-entry events are infrequent and localized to burst boundaries +- Regressed behavior: + - repeated short-interval mode toggles across an extended window + - persistent queue-age growth while in smooth mode + - long catch-up runs without backlog reduction + +## Experiment history + +This section captures the major tuning passes so future work can build on +what has already been tried. + +- Baseline + - One-line smooth draining with a 50ms commit tick. + - This preserved familiar pacing but could feel laggy under sustained + backlog. +- Pass 1: instant catch-up, baseline tick unchanged + - Kept smooth-mode semantics but made catch-up drain the full queued + backlog each catch-up tick. + - Result: queue lag dropped faster, but perceived motion could still feel + stepped because smooth-mode cadence remained coarse. +- Pass 2: faster baseline tick (25ms) + - Improved smooth-mode cadence and reduced visible stepping. + - Result: better, but still not aligned with draw cadence. +- Pass 3: frame-aligned baseline tick (~16.7ms) + - Set baseline commit cadence to approximately 60fps. + - Result: smoother perceived progression while retaining hysteresis and + fast backlog convergence. +- Pass 4: higher frame-aligned baseline tick (~8.3ms) + - Set baseline commit cadence to approximately 120fps. + - Result: further reduced smooth-mode stepping while preserving the same + adaptive catch-up policy shape. + +Current state combines: + +- instant catch-up draining in `CatchUp` +- hysteresis for mode-entry/exit stability +- frame-aligned smooth-mode commit cadence (~8.3ms) + +## Notes + +- Validation is source-agnostic and does not rely on naming any specific + upstream provider. +- This process intentionally preserves existing baseline smooth behavior and + focuses on burst/backlog handling behavior.