Skip to content

Commit

Permalink
Fix #2443 (#2599)
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper authored Jan 31, 2025
1 parent c3e4ac5 commit f01a7e0
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 21 deletions.
30 changes: 20 additions & 10 deletions crates/ingress-http/src/handler/service_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use tracing::{info, trace, trace_span, Instrument};

use super::path_parsing::{InvokeType, ServiceRequestType, TargetType};
use super::tracing::prepare_tracing_span;
use super::HandlerError;
use super::{Handler, APPLICATION_JSON};
use crate::handler::responses::{IDEMPOTENCY_EXPIRES, X_RESTATE_ID};
use crate::metric_definitions::{INGRESS_REQUESTS, INGRESS_REQUEST_DURATION, REQUEST_COMPLETED};
use crate::RequestDispatcher;
use restate_types::identifiers::{InvocationId, WithInvocationId};
use restate_types::invocation::{
Header, InvocationRequest, InvocationRequestHeader, InvocationTarget, InvocationTargetType,
Expand All @@ -28,14 +35,7 @@ use restate_types::invocation::{
use restate_types::schema::invocation_target::{
InvocationTargetMetadata, InvocationTargetResolver,
};

use super::path_parsing::{InvokeType, ServiceRequestType, TargetType};
use super::tracing::prepare_tracing_span;
use super::HandlerError;
use super::{Handler, APPLICATION_JSON};
use crate::handler::responses::{IDEMPOTENCY_EXPIRES, X_RESTATE_ID};
use crate::metric_definitions::{INGRESS_REQUESTS, INGRESS_REQUEST_DURATION, REQUEST_COMPLETED};
use crate::RequestDispatcher;
use restate_types::time::MillisSinceEpoch;

pub(crate) const IDEMPOTENCY_KEY: HeaderName = HeaderName::from_static("idempotency-key");
const DELAY_QUERY_PARAM: &str = "delay";
Expand Down Expand Up @@ -255,7 +255,6 @@ where
dispatcher: Dispatcher,
) -> Result<Response<Full<Bytes>>, HandlerError> {
let invocation_id = invocation_request.invocation_id();
let execution_time = invocation_request.header.execution_time;

// Send the service invocation, wait for the submit notification
let response = dispatcher.send(invocation_request).await?;
Expand All @@ -268,7 +267,18 @@ where
.body(Full::new(
serde_json::to_vec(&SendResponse {
invocation_id,
execution_time: execution_time.map(SystemTime::from).map(Into::into),
execution_time: response
.execution_time
.and_then(|m| {
if m == MillisSinceEpoch::UNIX_EPOCH {
// Ignore
None
} else {
Some(m)
}
})
.map(SystemTime::from)
.map(Into::into),
status: if response.is_new_invocation {
SendStatus::Accepted
} else {
Expand Down
5 changes: 5 additions & 0 deletions crates/ingress-http/src/handler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ async fn send_service() {

ready(Ok(SubmittedInvocationNotification {
request_id: Default::default(),
execution_time: None,
is_new_invocation: true,
}))
.boxed()
Expand Down Expand Up @@ -299,6 +300,7 @@ async fn send_with_delay_service() {

ready(Ok(SubmittedInvocationNotification {
request_id: Default::default(),
execution_time: None,
is_new_invocation: true,
}))
.boxed()
Expand Down Expand Up @@ -345,6 +347,7 @@ async fn send_virtual_object() {

ready(Ok(SubmittedInvocationNotification {
request_id: Default::default(),
execution_time: None,
is_new_invocation: true,
}))
.boxed()
Expand Down Expand Up @@ -465,6 +468,7 @@ async fn idempotency_key_and_send() {

ready(Ok(SubmittedInvocationNotification {
request_id: Default::default(),
execution_time: None,
is_new_invocation: true,
}))
.boxed()
Expand Down Expand Up @@ -525,6 +529,7 @@ async fn idempotency_key_and_send_with_different_invocation_id() {

ready(Ok(SubmittedInvocationNotification {
request_id: Default::default(),
execution_time: None,
is_new_invocation: true,
}))
.boxed()
Expand Down
9 changes: 9 additions & 0 deletions crates/storage-api/src/invocation_status_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,15 @@ impl InvocationStatus {
}
}

#[inline]
pub fn execution_time(&self) -> Option<MillisSinceEpoch> {
match self {
InvocationStatus::Scheduled(metadata) => metadata.metadata.execution_time,
InvocationStatus::Inboxed(metadata) => metadata.metadata.execution_time,
_ => None,
}
}

#[inline]
pub fn idempotency_key(&self) -> Option<&ByteString> {
match self {
Expand Down
1 change: 1 addition & 0 deletions crates/types/src/net/partition_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ pub enum PartitionProcessorRpcResponse {
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct SubmittedInvocationNotification {
pub request_id: PartitionProcessorRpcRequestId,
pub execution_time: Option<MillisSinceEpoch>,
/// If true, this request_id created a "fresh invocation",
/// otherwise the invocation was previously submitted.
pub is_new_invocation: bool,
Expand Down
2 changes: 2 additions & 0 deletions crates/worker/src/partition/leadership/leader_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,13 +423,15 @@ impl LeaderState {
}
Action::IngressSubmitNotification {
request_id,
execution_time,
is_new_invocation,
..
} => {
if let Some(response_tx) = self.awaiting_rpc_actions.remove(&request_id) {
respond_to_rpc(response_tx.prepare(Ok(
PartitionProcessorRpcResponse::Submitted(SubmittedInvocationNotification {
request_id,
execution_time,
is_new_invocation,
}),
)));
Expand Down
1 change: 1 addition & 0 deletions crates/worker/src/partition/state_machine/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub enum Action {
},
IngressSubmitNotification {
request_id: PartitionProcessorRpcRequestId,
execution_time: Option<MillisSinceEpoch>,
/// If true, this request_id created a "fresh invocation",
/// otherwise the invocation was previously submitted.
is_new_invocation: bool,
Expand Down
22 changes: 18 additions & 4 deletions crates/worker/src/partition/state_machine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,12 +526,18 @@ impl<'a, S> StateMachineApplyContext<'a, S> {
PreFlightInvocationMetadata::from_service_invocation(service_invocation);

// 2. Check if we need to schedule it
let execution_time = pre_flight_invocation_metadata.execution_time;
let Some(pre_flight_invocation_metadata) = self
.handle_service_invocation_execution_time(invocation_id, pre_flight_invocation_metadata)
.await?
else {
// Invocation was scheduled, send back the ingress attach notification and return
self.send_submit_notification_if_needed(invocation_id, true, submit_notification_sink);
self.send_submit_notification_if_needed(
invocation_id,
execution_time,
true,
submit_notification_sink,
);
return Ok(());
};

Expand All @@ -544,15 +550,20 @@ impl<'a, S> StateMachineApplyContext<'a, S> {
.await?
else {
// Invocation was inboxed, send back the ingress attach notification and return
self.send_submit_notification_if_needed(invocation_id, true, submit_notification_sink);
self.send_submit_notification_if_needed(
invocation_id,
execution_time,
true,
submit_notification_sink,
);
// Invocation was inboxed, nothing else to do here
return Ok(());
};

// 4. Execute it
Self::send_submit_notification_if_needed(
self,
self.send_submit_notification_if_needed(
invocation_id,
pre_flight_invocation_metadata.execution_time,
true,
submit_notification_sink,
);
Expand Down Expand Up @@ -675,6 +686,7 @@ impl<'a, S> StateMachineApplyContext<'a, S> {
// Send submit notification
self.send_submit_notification_if_needed(
service_invocation.invocation_id,
previous_invocation_status.execution_time(),
// is_new_invocation is true if the RPC ingress request is a duplicate.
service_invocation.source
== *previous_invocation_status
Expand Down Expand Up @@ -3469,6 +3481,7 @@ impl<'a, S> StateMachineApplyContext<'a, S> {
fn send_submit_notification_if_needed(
&mut self,
invocation_id: InvocationId,
execution_time: Option<MillisSinceEpoch>,
is_new_invocation: bool,
submit_notification_sink: Option<SubmitNotificationSink>,
) {
Expand All @@ -3483,6 +3496,7 @@ impl<'a, S> StateMachineApplyContext<'a, S> {
self.action_collector
.push(Action::IngressSubmitNotification {
request_id,
execution_time,
is_new_invocation,
});
}
Expand Down
19 changes: 12 additions & 7 deletions crates/worker/src/partition/state_machine/tests/delayed_send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ async fn send_with_delay() {
contains(pat!(Action::RegisterTimer { .. })),
contains(eq(Action::IngressSubmitNotification {
request_id,
execution_time: Some(wake_up_time),
is_new_invocation: true
}))
)
Expand All @@ -63,6 +64,7 @@ async fn send_with_delay() {
contains(matchers::actions::invoke_for_id(invocation_id)),
not(contains(eq(Action::IngressSubmitNotification {
request_id,
execution_time: Some(wake_up_time),
is_new_invocation: true,
})))
)
Expand Down Expand Up @@ -102,6 +104,7 @@ async fn send_with_delay_to_locked_virtual_object() {
contains(pat!(Action::RegisterTimer { .. })),
contains(eq(Action::IngressSubmitNotification {
request_id,
execution_time: Some(wake_up_time),
is_new_invocation: true,
}))
)
Expand Down Expand Up @@ -130,6 +133,7 @@ async fn send_with_delay_to_locked_virtual_object() {
not(contains(matchers::actions::invoke_for_id(invocation_id))),
not(contains(eq(Action::IngressSubmitNotification {
request_id,
execution_time: Some(wake_up_time),
is_new_invocation: true,
})))
)
Expand Down Expand Up @@ -162,6 +166,9 @@ async fn send_with_delay_and_idempotency_key() {
let invocation_id = InvocationId::generate(&invocation_target, Some(&idempotency_key));

let request_id_1 = PartitionProcessorRpcRequestId::default();
let execution_time = Some(MillisSinceEpoch::from(
SystemTime::now() + Duration::from_secs(60),
));

let actions = test_env
.apply(Command::Invoke(ServiceInvocation {
Expand All @@ -173,9 +180,7 @@ async fn send_with_delay_and_idempotency_key() {
}),
completion_retention_duration: Some(retention),
// Doesn't matter the execution time here, just needs to be filled
execution_time: Some(MillisSinceEpoch::from(
SystemTime::now() + Duration::from_secs(60),
)),
execution_time,
source: Source::Ingress(request_id_1),
..ServiceInvocation::mock()
}))
Expand All @@ -187,6 +192,7 @@ async fn send_with_delay_and_idempotency_key() {
contains(pat!(Action::RegisterTimer { .. })),
contains(eq(Action::IngressSubmitNotification {
request_id: request_id_1,
execution_time,
is_new_invocation: true,
}))
)
Expand All @@ -203,10 +209,8 @@ async fn send_with_delay_and_idempotency_key() {
request_id: request_id_2,
}),
completion_retention_duration: Some(retention),
// Doesn't matter the execution time here, just needs to be filled
execution_time: Some(MillisSinceEpoch::from(
SystemTime::now() + Duration::from_secs(60),
)),
// Needs to be different from the original one!
execution_time: execution_time.map(|m| m + Duration::from_secs(10)),
source: Source::Ingress(request_id_2),
..ServiceInvocation::mock()
}))
Expand All @@ -217,6 +221,7 @@ async fn send_with_delay_and_idempotency_key() {
not(contains(matchers::actions::invoke_for_id(invocation_id))),
contains(eq(Action::IngressSubmitNotification {
request_id: request_id_2,
execution_time,
is_new_invocation: false,
}))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ async fn attach_with_send_service_invocation(
not(contains(pat!(Action::IngressResponse { .. }))),
contains(eq(Action::IngressSubmitNotification {
request_id: request_id_2,
execution_time: None,
is_new_invocation: use_same_request_id,
}))
)
Expand Down Expand Up @@ -575,6 +576,7 @@ async fn attach_inboxed_with_send_service_invocation(
}))),
contains(eq(Action::IngressSubmitNotification {
request_id: request_id_1,
execution_time: None,
is_new_invocation: true,
}))
)
Expand Down Expand Up @@ -618,6 +620,7 @@ async fn attach_inboxed_with_send_service_invocation(
not(contains(pat!(Action::IngressResponse { .. }))),
contains(eq(Action::IngressSubmitNotification {
request_id: request_id_2,
execution_time: None,
is_new_invocation: false,
}))
)
Expand Down

0 comments on commit f01a7e0

Please sign in to comment.