Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 56 additions & 35 deletions crates/worker/src/partition/cleaner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,34 +130,32 @@ where
// thus it will be cleaned up with the old timer.
continue;
};
let Some(status_expiration_time) = SystemTime::from(completed_time)
.checked_add(completed_invocation.completion_retention_duration)
else {
// If sum overflow, then the cleanup time lies far enough in the future
continue;
};

let now = SystemTime::now();
if now >= status_expiration_time {
restate_bifrost::append_to_bifrost(
bifrost,
Arc::new(Envelope {
header: Header {
source: bifrost_envelope_source.clone(),
dest: Destination::Processor {
partition_key: invocation_id.partition_key(),
dedup: None,
if let Some(status_expiration_time) = SystemTime::from(completed_time)
.checked_add(completed_invocation.completion_retention_duration)
{
if now >= status_expiration_time {
restate_bifrost::append_to_bifrost(
bifrost,
Arc::new(Envelope {
header: Header {
source: bifrost_envelope_source.clone(),
dest: Destination::Processor {
partition_key: invocation_id.partition_key(),
dedup: None,
},
},
},
command: Command::PurgeInvocation(PurgeInvocationRequest {
invocation_id,
response_sink: None,
command: Command::PurgeInvocation(PurgeInvocationRequest {
invocation_id,
response_sink: None,
}),
}),
}),
)
.await
.context("Cannot append to bifrost purge invocation")?;
continue;
)
.await
.context("Cannot append to bifrost purge invocation")?;
continue;
}
}

// We don't cleanup the status yet, let's check if there's a journal to cleanup
Expand Down Expand Up @@ -209,7 +207,7 @@ mod tests {
use restate_storage_api::StorageError;
use restate_storage_api::invocation_status_table::{
CompletedInvocation, InFlightInvocationMetadata, InvocationStatus,
InvokedInvocationStatusLite,
InvokedInvocationStatusLite, JournalMetadata,
};
use restate_types::Version;
use restate_types::identifiers::{InvocationId, InvocationUuid};
Expand Down Expand Up @@ -266,6 +264,8 @@ mod tests {

let expired_invocation =
InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random());
let expired_journal =
InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random());
let not_expired_invocation_1 =
InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random());
let not_expired_invocation_2 =
Expand All @@ -281,6 +281,19 @@ mod tests {
..CompletedInvocation::mock_neo()
}),
),
(
expired_journal,
InvocationStatus::Completed(CompletedInvocation {
completion_retention_duration: Duration::MAX,
journal_retention_duration: Duration::ZERO,
journal_metadata: JournalMetadata {
length: 2,
commands: 2,
span_context: Default::default(),
},
..CompletedInvocation::mock_neo()
}),
),
(
not_expired_invocation_1,
InvocationStatus::Completed(CompletedInvocation {
Expand Down Expand Up @@ -324,19 +337,27 @@ mod tests {
})
.unwrap();

let mut log_entries = bifrost.read_all(partition_id.into()).await.unwrap();
let bifrost_message = log_entries
.remove(0)
.try_decode::<Envelope>()
let log_entries: Vec<_> = bifrost
.read_all(partition_id.into())
.await
.unwrap()
.unwrap();
.into_iter()
.map(|e| e.try_decode::<Envelope>().unwrap().unwrap().command)
.collect();

assert_that!(
bifrost_message.command,
pat!(Command::PurgeInvocation(pat!(PurgeInvocationRequest {
invocation_id: eq(expired_invocation)
})))
log_entries,
all!(
len(eq(2)),
contains(pat!(Command::PurgeInvocation(pat!(
PurgeInvocationRequest {
invocation_id: eq(expired_invocation),
}
)))),
contains(pat!(Command::PurgeJournal(pat!(PurgeInvocationRequest {
invocation_id: eq(expired_journal),
})))),
)
);
assert_that!(log_entries, empty());
}
}
Loading