Skip to content
Closed
21 changes: 18 additions & 3 deletions cli/src/clients/admin_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ pub trait AdminClientInterface {

async fn purge_invocation(&self, id: &str) -> reqwest::Result<Envelope<()>>;

async fn cancel_invocation(&self, id: &str, kill: bool) -> reqwest::Result<Envelope<()>>;
async fn cancel_invocation(
&self,
id: &str,
kill: bool,
retry: bool,
) -> reqwest::Result<Envelope<()>>;

async fn patch_state(
&self,
Expand Down Expand Up @@ -132,15 +137,25 @@ impl AdminClientInterface for AdminClient {
self.run(reqwest::Method::DELETE, url).await
}

async fn cancel_invocation(&self, id: &str, kill: bool) -> reqwest::Result<Envelope<()>> {
async fn cancel_invocation(
&self,
id: &str,
kill: bool,
retry: bool,
) -> reqwest::Result<Envelope<()>> {
let mut url = self
.base_url
.join(&format!("/invocations/{id}"))
.expect("Bad url!");

url.set_query(Some(&format!(
"mode={}",
if kill { "kill" } else { "cancel" }
match (kill, retry) {
(false, false) => "cancel",
(false, true) => "cancel-and-restart",
(true, false) => "kill",
(true, true) => "kill-and-restart",
}
)));

self.run(reqwest::Method::DELETE, url).await
Expand Down
18 changes: 12 additions & 6 deletions cli/src/commands/invocations/cancel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ pub struct Cancel {
/// Ungracefully kill the invocation and its children
#[clap(long)]
kill: bool,
/// After cancelling/killing, restart the invocation using the same input.
#[clap(long, alias = "retry")]
restart: bool,
}

pub async fn run_cancel(State(env): State<CliEnv>, opts: &Cancel) -> Result<()> {
Expand Down Expand Up @@ -67,16 +70,19 @@ pub async fn run_cancel(State(env): State<CliEnv>, opts: &Cancel) -> Result<()>
// Get the invocation and confirm
let prompt = format!(
"Are you sure you want to {} these invocations?",
if opts.kill {
Styled(Style::Danger, "kill")
} else {
Styled(Style::Warn, "cancel")
},
match (opts.kill, opts.restart) {
(false, false) => Styled(Style::Warn, "cancel"),
(false, true) => Styled(Style::Warn, "cancel and restart"),
(true, false) => Styled(Style::Danger, "kill"),
(true, true) => Styled(Style::Danger, "kill and restart"),
}
);
confirm_or_exit(&prompt)?;

for inv in invocations {
let result = client.cancel_invocation(&inv.id, opts.kill).await?;
let result = client
.cancel_invocation(&inv.id, opts.kill, opts.restart)
.await?;
let _ = result.success_or_error()?;
}

Expand Down
10 changes: 10 additions & 0 deletions crates/admin/src/rest_api/invocations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ pub enum DeletionMode {
Kill,
#[serde(alias = "purge")]
Purge,
#[serde(alias = "kill-and-restart")]
KillAndRestart,
#[serde(alias = "cancel-and-restart")]
CancelAndRestart,
}
#[derive(Debug, Default, Deserialize, JsonSchema)]
pub struct DeleteInvocationParams {
Expand Down Expand Up @@ -90,6 +94,12 @@ pub async fn delete_invocation<V>(
Command::TerminateInvocation(InvocationTermination::kill(invocation_id))
}
DeletionMode::Purge => Command::PurgeInvocation(PurgeInvocationRequest { invocation_id }),
DeletionMode::CancelAndRestart => {
Command::TerminateInvocation(InvocationTermination::cancel_and_restart(invocation_id))
}
DeletionMode::KillAndRestart => {
Command::TerminateInvocation(InvocationTermination::kill_and_restart(invocation_id))
}
};

let partition_key = invocation_id.partition_key();
Expand Down
18 changes: 11 additions & 7 deletions crates/partition-store/src/journal_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use restate_types::identifiers::{
};
use restate_types::storage::StorageCodec;
use std::io::Cursor;
use std::ops::RangeInclusive;
use std::ops::{Range, RangeInclusive};

define_table_key!(
Journal,
Expand Down Expand Up @@ -121,14 +121,14 @@ fn all_journals<S: StorageAccess>(
}))
}

fn delete_journal<S: StorageAccess>(
fn delete_journal_range<S: StorageAccess>(
storage: &mut S,
invocation_id: &InvocationId,
journal_length: EntryIndex,
journal_range: Range<EntryIndex>,
) {
let mut key = write_journal_entry_key(invocation_id, 0);
let k = &mut key;
for journal_index in 0..journal_length {
for journal_index in journal_range {
k.journal_index = Some(journal_index);
storage.delete_key(k);
}
Expand Down Expand Up @@ -201,10 +201,14 @@ impl<'a> JournalTable for PartitionStoreTransaction<'a> {
put_journal_entry(self, invocation_id, journal_index, journal_entry)
}

async fn delete_journal(&mut self, invocation_id: &InvocationId, journal_length: EntryIndex) {
async fn delete_journal_range(
&mut self,
invocation_id: &InvocationId,
journal_range: Range<EntryIndex>,
) {
self.assert_partition_key(invocation_id);
let _x = RocksDbPerfGuard::new("delete-journal");
delete_journal(self, invocation_id, journal_length)
let _x = RocksDbPerfGuard::new("delete-journal-range");
delete_journal_range(self, invocation_id, journal_range)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ fn invoked_status(invocation_target: InvocationTarget) -> InvocationStatus {
source: Source::Ingress(*RPC_REQUEST_ID),
completion_retention_duration: Duration::ZERO,
idempotency_key: None,
restart_when_completed: false,
})
}

Expand All @@ -105,6 +106,7 @@ fn killed_status(invocation_target: InvocationTarget) -> InvocationStatus {
source: Source::Ingress(*RPC_REQUEST_ID),
completion_retention_duration: Duration::ZERO,
idempotency_key: None,
restart_when_completed: false,
})
}

Expand All @@ -119,6 +121,7 @@ fn suspended_status(invocation_target: InvocationTarget) -> InvocationStatus {
source: Source::Ingress(*RPC_REQUEST_ID),
completion_retention_duration: Duration::ZERO,
idempotency_key: None,
restart_when_completed: false,
},
waiting_for_completed_entries: HashSet::default(),
}
Expand Down
17 changes: 17 additions & 0 deletions crates/storage-api/proto/dev/restate/storage/v1/domain.proto
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ message InvocationStatusV2 {
uint32 journal_length = 14;
optional string deployment_id = 15;
optional dev.restate.service.protocol.ServiceProtocolVersion service_protocol_version = 16;
bool restart_when_completed = 23;

// Suspended
repeated uint32 waiting_for_completed_entries = 17;
Expand Down Expand Up @@ -519,14 +520,29 @@ message OutboxMessage {
ResponseResult response_result = 3;
}

// TODO remove this in Restate 1.3
message OutboxKill {
InvocationId invocation_id = 1;
}

// TODO remove this in Restate 1.3
message OutboxCancel {
InvocationId invocation_id = 1;
}

message OutboxTermination {
enum TerminationFlavor {
UNKNOWN = 0;
KILL = 1;
KILL_AND_RESTART = 2;
CANCEL = 3;
CANCEL_AND_RESTART = 4;
}

InvocationId invocation_id = 1;
TerminationFlavor flavor = 2;
}

message AttachInvocationRequest {
oneof query {
InvocationId invocation_id = 1;
Expand All @@ -543,6 +559,7 @@ message OutboxMessage {
OutboxKill kill = 4;
OutboxCancel cancel = 5;
AttachInvocationRequest attach_invocation_request = 6;
OutboxTermination termination = 7;
}

}
Expand Down
5 changes: 5 additions & 0 deletions crates/storage-api/src/invocation_status_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,9 @@ pub struct InFlightInvocationMetadata {
/// If zero, the invocation completion will not be retained.
pub completion_retention_duration: Duration,
pub idempotency_key: Option<ByteString>,

/// When the invocation completes, restart it.
pub restart_when_completed: bool,
}

impl InFlightInvocationMetadata {
Expand All @@ -496,6 +499,7 @@ impl InFlightInvocationMetadata {
completion_retention_duration: pre_flight_invocation_metadata
.completion_retention_duration,
idempotency_key: pre_flight_invocation_metadata.idempotency_key,
restart_when_completed: false,
},
InvocationInput {
argument: pre_flight_invocation_metadata.argument,
Expand Down Expand Up @@ -624,6 +628,7 @@ mod test_util {
source: Source::Ingress(PartitionProcessorRpcRequestId::default()),
completion_retention_duration: Duration::ZERO,
idempotency_key: None,
restart_when_completed: false,
}
}
}
Expand Down
10 changes: 9 additions & 1 deletion crates/storage-api/src/journal_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use restate_types::identifiers::{EntryIndex, InvocationId, JournalEntryId, Parti
use restate_types::journal::enriched::EnrichedRawEntry;
use restate_types::journal::{CompletionResult, EntryType};
use std::future::Future;
use std::ops::RangeInclusive;
use std::ops::{Range, RangeInclusive};

/// Different types of journal entries persisted by the runtime
#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -78,5 +78,13 @@ pub trait JournalTable: ReadOnlyJournalTable {
&mut self,
invocation_id: &InvocationId,
journal_length: EntryIndex,
) -> impl Future<Output = ()> + Send {
self.delete_journal_range(invocation_id, 0..journal_length)
}

fn delete_journal_range(
&mut self,
invocation_id: &InvocationId,
journal_range: Range<EntryIndex>,
) -> impl Future<Output = ()> + Send;
}
Loading