From dbcfdbdddf869f0ec9f6ecca3b4a6b9745af9650 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 30 Apr 2025 10:56:56 +0200 Subject: [PATCH] Add stale journal read check in invoker. --- crates/invoker-api/src/invocation_reader.rs | 5 ++++- crates/invoker-api/src/lib.rs | 1 + crates/invoker-impl/src/error.rs | 9 +++++++++ crates/invoker-impl/src/invocation_task/mod.rs | 7 +++++++ crates/worker/src/partition/invoker_storage_reader.rs | 1 + crates/worker/src/partition/state_machine/mod.rs | 1 + 6 files changed, 23 insertions(+), 1 deletion(-) diff --git a/crates/invoker-api/src/invocation_reader.rs b/crates/invoker-api/src/invocation_reader.rs index b21c14249d..1c1df921ec 100644 --- a/crates/invoker-api/src/invocation_reader.rs +++ b/crates/invoker-api/src/invocation_reader.rs @@ -12,7 +12,7 @@ use bytes::Bytes; use futures::Stream; use restate_types::deployment::PinnedDeployment; use restate_types::identifiers::{InvocationId, ServiceId}; -use restate_types::invocation::ServiceInvocationSpanContext; +use restate_types::invocation::{InvocationEpoch, ServiceInvocationSpanContext}; use restate_types::journal::EntryIndex; use restate_types::journal::raw::PlainRawEntry; use restate_types::journal_v2::raw::RawEntry; @@ -25,6 +25,7 @@ pub struct JournalMetadata { pub length: EntryIndex, pub span_context: ServiceInvocationSpanContext, pub pinned_deployment: Option, + pub invocation_epoch: InvocationEpoch, /// This value is not agreed among Partition processor replicas right now. /// /// The upper bound for the total clock skew is the clock skew of the different machines @@ -37,6 +38,7 @@ impl JournalMetadata { length: EntryIndex, span_context: ServiceInvocationSpanContext, pinned_deployment: Option, + invocation_epoch: InvocationEpoch, last_modification_date: MillisSinceEpoch, ) -> Self { Self { @@ -44,6 +46,7 @@ impl JournalMetadata { span_context, length, last_modification_date, + invocation_epoch, } } } diff --git a/crates/invoker-api/src/lib.rs b/crates/invoker-api/src/lib.rs index d3a658b47b..eef602f257 100644 --- a/crates/invoker-api/src/lib.rs +++ b/crates/invoker-api/src/lib.rs @@ -70,6 +70,7 @@ pub mod test_util { 0, ServiceInvocationSpanContext::empty(), None, + 0, MillisSinceEpoch::UNIX_EPOCH, ), futures::stream::empty(), diff --git a/crates/invoker-impl/src/error.rs b/crates/invoker-impl/src/error.rs index f634822468..2a96607378 100644 --- a/crates/invoker-impl/src/error.rs +++ b/crates/invoker-impl/src/error.rs @@ -14,6 +14,7 @@ use restate_service_client::ServiceClientError; use restate_service_protocol::message::{EncodingError, MessageType}; use restate_types::errors::{InvocationError, InvocationErrorCode, codes}; use restate_types::identifiers::DeploymentId; +use restate_types::invocation::InvocationEpoch; use restate_types::journal::raw::RawEntryCodecError; use restate_types::journal::{EntryIndex, EntryType}; use restate_types::journal_v2; @@ -105,6 +106,14 @@ pub(crate) enum InvokerError { #[error("error when trying to read the service instance state: {0}")] #[code(restate_errors::RT0006)] StateReader(anyhow::Error), + #[error( + "error when reading the journal: actual epoch {actual} != expected epoch {expected}. This is expected to happen while a trim and restart is being processed." + )] + #[code(restate_errors::RT0006)] + StaleJournalRead { + actual: InvocationEpoch, + expected: InvocationEpoch, + }, #[error(transparent)] #[code(restate_errors::RT0010)] diff --git a/crates/invoker-impl/src/invocation_task/mod.rs b/crates/invoker-impl/src/invocation_task/mod.rs index 338e7061f3..f0a2331804 100644 --- a/crates/invoker-impl/src/invocation_task/mod.rs +++ b/crates/invoker-impl/src/invocation_task/mod.rs @@ -285,6 +285,13 @@ where ), }; + if self.invocation_epoch != journal_metadata.invocation_epoch { + shortcircuit!(Err(InvokerError::StaleJournalRead { + actual: journal_metadata.invocation_epoch, + expected: self.invocation_epoch + })); + } + // Resolve the deployment metadata let schemas = self.schemas.live_load(); let (deployment, chosen_service_protocol_version, deployment_changed) = diff --git a/crates/worker/src/partition/invoker_storage_reader.rs b/crates/worker/src/partition/invoker_storage_reader.rs index 02c1bdbbea..9196732d5b 100644 --- a/crates/worker/src/partition/invoker_storage_reader.rs +++ b/crates/worker/src/partition/invoker_storage_reader.rs @@ -83,6 +83,7 @@ where invoked_status.journal_metadata.length, invoked_status.journal_metadata.span_context, invoked_status.pinned_deployment.clone(), + invoked_status.current_invocation_epoch, // SAFETY: this value is used by the invoker, it's ok if it's not in sync unsafe { invoked_status.timestamps.modification_time() }, ); diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index 0b7432f5b5..d8ce718670 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -907,6 +907,7 @@ impl StateMachineApplyContext<'_, S> { .span_context .clone(), None, + in_flight_invocation_metadata.current_invocation_epoch, // This is safe to do as only the leader will execute the invoker command MillisSinceEpoch::now(), ),