Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 79 additions & 51 deletions crates/goose/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,8 @@ impl Scheduler {
schedule_sessions.push((session.id.clone(), session));
}
}
schedule_sessions.sort_by(|a, b| b.0.cmp(&a.0));
// Sort by created_at timestamp, newest first
schedule_sessions.sort_by(|a, b| b.1.created_at.cmp(&a.1.created_at));
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Newer sessions were at the bottom of the list, again causing a confusing UX


let result_sessions: Vec<(String, Session)> =
schedule_sessions.into_iter().take(limit).collect();
Expand Down Expand Up @@ -1199,65 +1200,92 @@ async fn run_scheduled_job_internal(
}
}

if let Some(ref prompt_text) = recipe.prompt {
let mut conversation =
Conversation::new_unvalidated(vec![Message::user().with_text(prompt_text.clone())]);

let session_config = SessionConfig {
id: session.id.clone(),
working_dir: current_dir.clone(),
schedule_id: Some(job.id.clone()),
execution_mode: job.execution_mode.clone(),
max_turns: None,
retry_config: None,
};
if recipe.prompt.as_ref().map_or(true, |s| s.trim().is_empty()) {
let error_message = Message::assistant().with_text(format!(
"Schedule Execution Failed\n\n\
This recipe does not have a valid 'prompt' field, which is required for scheduled execution.\n\n\
To fix this issue, add a 'prompt' field to your recipe with the instructions you want to execute."
));

match agent
.reply(conversation.clone(), Some(session_config.clone()), None)
SessionManager::add_message(&session.id, &error_message)
.await
{
Ok(mut stream) => {
use futures::StreamExt;
.map_err(|e| JobExecutionError {
job_id: job.id.clone(),
error: format!("Failed to add error message to session: {}", e),
})?;

while let Some(message_result) = stream.next().await {
tokio::task::yield_now().await;
SessionManager::update_session(&session.id)
.schedule_id(Some(job.id.clone()))
.recipe(Some(recipe))
.apply()
.await
.map_err(|e| JobExecutionError {
job_id: job.id.clone(),
error: format!("Failed to update session metadata: {}", e),
})?;

match message_result {
Ok(AgentEvent::Message(msg)) => {
if msg.role == rmcp::model::Role::Assistant {
tracing::info!("[Job {}] Assistant: {:?}", job.id, msg.content);
}
conversation.push(msg);
}
Ok(AgentEvent::McpNotification(_)) => {}
Ok(AgentEvent::ModelChange { .. }) => {}
Ok(AgentEvent::HistoryReplaced(updated_conversation)) => {
conversation = updated_conversation;
}
Err(e) => {
tracing::error!(
"[Job {}] Error receiving message from agent: {}",
job.id,
e
);
break;
tracing::error!(
"[Job {}] Recipe '{}' has no valid prompt to execute. Created session {} with error message.",
job.id,
job.source,
session.id
);

return Ok(session.id);
}

let prompt_text = recipe.prompt.as_ref().unwrap();
let mut conversation =
Conversation::new_unvalidated(vec![Message::user().with_text(prompt_text.clone())]);

let session_config = SessionConfig {
id: session.id.clone(),
working_dir: current_dir.clone(),
schedule_id: Some(job.id.clone()),
execution_mode: job.execution_mode.clone(),
max_turns: None,
retry_config: None,
};

match agent
.reply(conversation.clone(), Some(session_config.clone()), None)
.await
{
Ok(mut stream) => {
use futures::StreamExt;

while let Some(message_result) = stream.next().await {
tokio::task::yield_now().await;

match message_result {
Ok(AgentEvent::Message(msg)) => {
if msg.role == rmcp::model::Role::Assistant {
tracing::info!("[Job {}] Assistant: {:?}", job.id, msg.content);
}
conversation.push(msg);
}
Ok(AgentEvent::McpNotification(_)) => {}
Ok(AgentEvent::ModelChange { .. }) => {}
Ok(AgentEvent::HistoryReplaced(updated_conversation)) => {
conversation = updated_conversation;
}
Err(e) => {
tracing::error!(
"[Job {}] Error receiving message from agent: {}",
job.id,
e
);
break;
}
}
}
Err(e) => {
return Err(JobExecutionError {
job_id: job.id.clone(),
error: format!("Agent failed to reply for recipe '{}': {}", job.source, e),
});
}
}
} else {
tracing::warn!(
"[Job {}] Recipe '{}' has no prompt to execute.",
job.id,
job.source
);
Err(e) => {
return Err(JobExecutionError {
job_id: job.id.clone(),
error: format!("Agent failed to reply for recipe '{}': {}", job.source, e),
});
}
}

if let Err(e) = SessionManager::update_session(&session.id)
Expand Down
Loading