From 8ae21720d353f566ded62b049064679094a671ed Mon Sep 17 00:00:00 2001 From: dentiny Date: Wed, 27 Aug 2025 09:09:11 +0000 Subject: [PATCH 1/6] handle multiple ongoing flush --- src/moonlink/src/storage/mooncake_table.rs | 49 ++++++++----- .../src/storage/mooncake_table/tests.rs | 70 +++++++++---------- .../mooncake_table/transaction_stream.rs | 11 ++- src/moonlink/src/table_handler/tests.rs | 6 +- 4 files changed, 76 insertions(+), 60 deletions(-) diff --git a/src/moonlink/src/storage/mooncake_table.rs b/src/moonlink/src/storage/mooncake_table.rs index 761dd3b21..f61dcb471 100644 --- a/src/moonlink/src/storage/mooncake_table.rs +++ b/src/moonlink/src/storage/mooncake_table.rs @@ -65,7 +65,7 @@ use delete_vector::BatchDeletionVector; pub(crate) use disk_slice::DiskSliceWriter; use mem_slice::MemSlice; pub(crate) use snapshot::SnapshotTableState; -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::path::PathBuf; use std::sync::Arc; use table_snapshot::{IcebergSnapshotImportResult, IcebergSnapshotIndexMergeResult}; @@ -462,7 +462,8 @@ pub struct MooncakeTable { wal_manager: WalManager, /// LSN of ongoing flushes. - pub ongoing_flush_lsns: BTreeSet, + /// Maps from LSN to its count. + pub ongoing_flush_lsns: BTreeMap, /// Table replay sender. event_replay_tx: Option>, @@ -565,7 +566,7 @@ impl MooncakeTable { last_iceberg_snapshot_lsn, table_notify: None, wal_manager, - ongoing_flush_lsns: BTreeSet::new(), + ongoing_flush_lsns: BTreeMap::new(), event_replay_tx: None, }) } @@ -843,10 +844,11 @@ impl MooncakeTable { disk_slice: &mut DiskSliceWriter, table_notify_tx: Sender, xact_id: Option, + ongoing_flush_count: u32, event_id: uuid::Uuid, ) { if let Some(lsn) = disk_slice.lsn() { - self.insert_ongoing_flush_lsn(lsn); + self.insert_ongoing_flush_lsn(lsn, ongoing_flush_count); } else { assert!( xact_id.is_some(), @@ -913,31 +915,39 @@ impl MooncakeTable { fn try_set_next_flush_lsn(&mut self, lsn: u64) { let min_pending_lsn = self.get_min_ongoing_flush_lsn(); if lsn < min_pending_lsn { + // TODO(hjiang): Add assertion that flush LSN never regresses, currently it's still buggy, I will fix in the followup PR. self.next_snapshot_task.new_flush_lsn = Some(lsn); } } // We fallback to u64::MAX if there are no pending flush LSNs so that the LSN is always greater than the flush LSN and the iceberg snapshot can proceed. pub fn get_min_ongoing_flush_lsn(&self) -> u64 { - self.ongoing_flush_lsns - .iter() - .next() - .copied() - .unwrap_or(u64::MAX) + if let Some((lsn, _)) = self.ongoing_flush_lsns.first_key_value() { + return *lsn; + } + u64::MAX } - pub fn insert_ongoing_flush_lsn(&mut self, lsn: u64) { - assert!( - self.ongoing_flush_lsns.insert(lsn), - "LSN {lsn} already in pending flush LSNs" - ); + pub fn insert_ongoing_flush_lsn(&mut self, lsn: u64, count: u32) { + *self.ongoing_flush_lsns.entry(lsn).or_insert(0) += count; } pub fn remove_ongoing_flush_lsn(&mut self, lsn: u64) { - assert!( - self.ongoing_flush_lsns.remove(&lsn), - "LSN {lsn} not found in pending flush LSNs" - ); + use std::collections::btree_map::Entry; + + match self.ongoing_flush_lsns.entry(lsn) { + Entry::Occupied(mut entry) => { + let counter = entry.get_mut(); + if *counter > 1 { + *counter -= 1; + } else { + entry.remove(); + } + } + Entry::Vacant(_) => { + panic!("Tried to remove LSN {lsn}, but it is not tracked"); + } + } } pub fn has_ongoing_flush(&self) -> bool { @@ -1203,7 +1213,7 @@ impl MooncakeTable { ); let table_notify_tx = self.table_notify.as_ref().unwrap().clone(); - if self.mem_slice.is_empty() || self.ongoing_flush_lsns.contains(&lsn) { + if self.mem_slice.is_empty() || self.ongoing_flush_lsns.contains_key(&lsn) { self.try_set_next_flush_lsn(lsn); tokio::task::spawn(async move { table_notify_tx @@ -1236,6 +1246,7 @@ impl MooncakeTable { &mut disk_slice, table_notify_tx, /*xact_id=*/ None, + /*ongoing_flush_count=*/ 1, event_id, ); diff --git a/src/moonlink/src/storage/mooncake_table/tests.rs b/src/moonlink/src/storage/mooncake_table/tests.rs index 5ca24042f..e654b4d6c 100644 --- a/src/moonlink/src/storage/mooncake_table/tests.rs +++ b/src/moonlink/src/storage/mooncake_table/tests.rs @@ -1300,9 +1300,9 @@ async fn test_ongoing_flush_lsns_tracking() -> Result<()> { .expect("Disk slice 3 should be present"); // Verify all LSNs are tracked - assert!(table.ongoing_flush_lsns.contains(&5)); - assert!(table.ongoing_flush_lsns.contains(&10)); - assert!(table.ongoing_flush_lsns.contains(&15)); + assert!(table.ongoing_flush_lsns.contains_key(&5)); + assert!(table.ongoing_flush_lsns.contains_key(&10)); + assert!(table.ongoing_flush_lsns.contains_key(&15)); assert_eq!(table.ongoing_flush_lsns.len(), 3); // Verify min is correctly calculated (should be 5) @@ -1310,15 +1310,15 @@ async fn test_ongoing_flush_lsns_tracking() -> Result<()> { // Complete flush with LSN 10 (out of order completion) table.apply_flush_result(disk_slice_2, uuid::Uuid::new_v4() /*placeholder*/); - assert!(table.ongoing_flush_lsns.contains(&5)); - assert!(!table.ongoing_flush_lsns.contains(&10)); - assert!(table.ongoing_flush_lsns.contains(&15)); + assert!(table.ongoing_flush_lsns.contains_key(&5)); + assert!(!table.ongoing_flush_lsns.contains_key(&10)); + assert!(table.ongoing_flush_lsns.contains_key(&15)); assert_eq!(table.get_min_ongoing_flush_lsn(), 5); // Still 5 // Complete flush with LSN 5 table.apply_flush_result(disk_slice_1, uuid::Uuid::new_v4() /*placeholder*/); - assert!(!table.ongoing_flush_lsns.contains(&5)); - assert!(table.ongoing_flush_lsns.contains(&15)); + assert!(!table.ongoing_flush_lsns.contains_key(&5)); + assert!(table.ongoing_flush_lsns.contains_key(&15)); assert_eq!(table.get_min_ongoing_flush_lsn(), 15); // Now 15 // Complete last flush @@ -1355,8 +1355,8 @@ async fn test_streaming_flush_lsns_tracking() -> Result<()> { .expect("Disk slice 2 should be present"); // Verify both streaming LSNs are tracked - assert!(table.ongoing_flush_lsns.contains(&100)); - assert!(table.ongoing_flush_lsns.contains(&50)); + assert!(table.ongoing_flush_lsns.contains_key(&100)); + assert!(table.ongoing_flush_lsns.contains_key(&50)); assert_eq!(table.get_min_ongoing_flush_lsn(), 50); // Mix with regular flush (must be higher than previous regular flush) @@ -1367,9 +1367,9 @@ async fn test_streaming_flush_lsns_tracking() -> Result<()> { .expect("Disk slice 3 should be present"); // Verify all three LSNs are tracked - assert!(table.ongoing_flush_lsns.contains(&100)); - assert!(table.ongoing_flush_lsns.contains(&50)); - assert!(table.ongoing_flush_lsns.contains(&75)); + assert!(table.ongoing_flush_lsns.contains_key(&100)); + assert!(table.ongoing_flush_lsns.contains_key(&50)); + assert!(table.ongoing_flush_lsns.contains_key(&75)); assert_eq!(table.get_min_ongoing_flush_lsn(), 50); // Complete streaming flushes @@ -1580,22 +1580,22 @@ async fn test_out_of_order_flush_completion() -> Result<()> { .expect("Disk slice should be present"); // Verify all are pending and min is 10 - assert!(table.ongoing_flush_lsns.contains(&10)); - assert!(table.ongoing_flush_lsns.contains(&20)); - assert!(table.ongoing_flush_lsns.contains(&30)); + assert!(table.ongoing_flush_lsns.contains_key(&10)); + assert!(table.ongoing_flush_lsns.contains_key(&20)); + assert!(table.ongoing_flush_lsns.contains_key(&30)); assert_eq!(table.get_min_ongoing_flush_lsn(), 10); // Complete flush 30 first (out of order) table.apply_flush_result(disk_slice_30, uuid::Uuid::new_v4() /*placeholder*/); - assert!(!table.ongoing_flush_lsns.contains(&30)); - assert!(table.ongoing_flush_lsns.contains(&10)); - assert!(table.ongoing_flush_lsns.contains(&20)); + assert!(!table.ongoing_flush_lsns.contains_key(&30)); + assert!(table.ongoing_flush_lsns.contains_key(&10)); + assert!(table.ongoing_flush_lsns.contains_key(&20)); assert_eq!(table.get_min_ongoing_flush_lsn(), 10); // Still 10 // Complete flush 10 (should update min to 20) table.apply_flush_result(disk_slice_10, uuid::Uuid::new_v4() /*placeholder*/); - assert!(!table.ongoing_flush_lsns.contains(&10)); - assert!(table.ongoing_flush_lsns.contains(&20)); + assert!(!table.ongoing_flush_lsns.contains_key(&10)); + assert!(table.ongoing_flush_lsns.contains_key(&20)); assert_eq!(table.get_min_ongoing_flush_lsn(), 20); // Complete flush 20 (should clear all) @@ -1631,8 +1631,8 @@ async fn test_mixed_regular_and_streaming_lsn_ordering() -> Result<()> { .expect("Streaming disk slice should be present"); // Verify both are tracked and min is 50 - assert!(table.ongoing_flush_lsns.contains(&100)); - assert!(table.ongoing_flush_lsns.contains(&50)); + assert!(table.ongoing_flush_lsns.contains_key(&100)); + assert!(table.ongoing_flush_lsns.contains_key(&50)); assert_eq!(table.get_min_ongoing_flush_lsn(), 50); // According to table handler logic, iceberg snapshots with flush_lsn >= 50 should be blocked @@ -1662,8 +1662,8 @@ async fn test_mixed_regular_and_streaming_lsn_ordering() -> Result<()> { streaming_disk_slice, uuid::Uuid::new_v4(), /*placeholder*/ ); - assert!(!table.ongoing_flush_lsns.contains(&50)); - assert!(table.ongoing_flush_lsns.contains(&100)); + assert!(!table.ongoing_flush_lsns.contains_key(&50)); + assert!(table.ongoing_flush_lsns.contains_key(&100)); assert_eq!(table.get_min_ongoing_flush_lsn(), 100); // Now iceberg snapshots with flush_lsn < 100 should be allowed @@ -1988,7 +1988,7 @@ async fn test_iceberg_snapshot_blocked_by_ongoing_flushes() -> Result<()> { table.commit(2); // Verify we have pending flushes - assert!(table.ongoing_flush_lsns.contains(&30)); + assert!(table.ongoing_flush_lsns.contains_key(&30)); assert_eq!(table.get_min_ongoing_flush_lsn(), 30); // Create a mooncake snapshot - this will create an iceberg payload @@ -2090,9 +2090,9 @@ async fn test_out_of_order_flush_completion_with_iceberg_snapshots() -> Result<( // Verify all pending flushes and min pending LSN assert_eq!(table.get_min_ongoing_flush_lsn(), 10); - assert!(table.ongoing_flush_lsns.contains(&10)); - assert!(table.ongoing_flush_lsns.contains(&20)); - assert!(table.ongoing_flush_lsns.contains(&30)); + assert!(table.ongoing_flush_lsns.contains_key(&10)); + assert!(table.ongoing_flush_lsns.contains_key(&20)); + assert!(table.ongoing_flush_lsns.contains_key(&30)); // Create snapshot and test constraint with min_ongoing_flush_lsn = 10 let created = table.create_snapshot(SnapshotOption { @@ -2136,16 +2136,16 @@ async fn test_out_of_order_flush_completion_with_iceberg_snapshots() -> Result<( // Complete flushes OUT OF ORDER - complete middle one first (LSN 20) table.apply_flush_result(disk_slice_2, uuid::Uuid::new_v4() /*placeholder*/); assert_eq!(table.get_min_ongoing_flush_lsn(), 10); // Should still be 10 - assert!(table.ongoing_flush_lsns.contains(&10)); - assert!(!table.ongoing_flush_lsns.contains(&20)); - assert!(table.ongoing_flush_lsns.contains(&30)); + assert!(table.ongoing_flush_lsns.contains_key(&10)); + assert!(!table.ongoing_flush_lsns.contains_key(&20)); + assert!(table.ongoing_flush_lsns.contains_key(&30)); // Complete the first flush (LSN 10) - now min should be 30 table.apply_flush_result(disk_slice_1, uuid::Uuid::new_v4() /*placeholder*/); assert_eq!(table.get_min_ongoing_flush_lsn(), 30); // Now should be 30 - assert!(!table.ongoing_flush_lsns.contains(&10)); - assert!(!table.ongoing_flush_lsns.contains(&20)); - assert!(table.ongoing_flush_lsns.contains(&30)); + assert!(!table.ongoing_flush_lsns.contains_key(&10)); + assert!(!table.ongoing_flush_lsns.contains_key(&20)); + assert!(table.ongoing_flush_lsns.contains_key(&30)); // Test constraint logic with new min pending flush LSN = 30 let can_initiate_low = TableHandlerState::can_initiate_iceberg_snapshot( diff --git a/src/moonlink/src/storage/mooncake_table/transaction_stream.rs b/src/moonlink/src/storage/mooncake_table/transaction_stream.rs index 9b07778fb..46a55d5e6 100644 --- a/src/moonlink/src/storage/mooncake_table/transaction_stream.rs +++ b/src/moonlink/src/storage/mooncake_table/transaction_stream.rs @@ -537,8 +537,15 @@ impl MooncakeTable { let table_notify_tx = self.table_notify.as_ref().unwrap().clone(); stream_state.ongoing_flush_count += 1; - - self.flush_disk_slice(&mut disk_slice, table_notify_tx, Some(xact_id), event_id); + let ongoing_flush_count = stream_state.ongoing_flush_count; + + self.flush_disk_slice( + &mut disk_slice, + table_notify_tx, + Some(xact_id), + ongoing_flush_count, + event_id, + ); // Add back stream state self.transaction_stream_states.insert(xact_id, stream_state); diff --git a/src/moonlink/src/table_handler/tests.rs b/src/moonlink/src/table_handler/tests.rs index d4a59e5d4..687645551 100644 --- a/src/moonlink/src/table_handler/tests.rs +++ b/src/moonlink/src/table_handler/tests.rs @@ -350,8 +350,8 @@ async fn test_stream_delete_unflushed_non_streamed_row() { } #[tokio::test] -async fn test_streaming_transaction_periodic_flush() { - let mut env = TestEnvironment::default().await; +async fn test_hjiang_streaming_transaction_periodic_flush() { + let env = TestEnvironment::default().await; let xact_id = 201; let commit_lsn = 20; // LSN at which the transaction will eventually commit let initial_read_lsn_target = commit_lsn; // For verifying no data pre-commit @@ -386,8 +386,6 @@ async fn test_streaming_transaction_periodic_flush() { env.verify_snapshot(final_read_lsn_target, &[10, 11, 12]) .await; - - env.shutdown().await; } #[tokio::test] From 5233dcb8819089d087e496317633851d0b571937 Mon Sep 17 00:00:00 2001 From: dentiny Date: Wed, 27 Aug 2025 09:36:21 +0000 Subject: [PATCH 2/6] add assertion --- src/moonlink/src/storage/mooncake_table.rs | 5 ++++- .../mooncake_table/table_operation_test_utils.rs | 2 +- src/moonlink/src/storage/mooncake_table/tests.rs | 12 ++++++++---- src/moonlink/src/table_handler/tests.rs | 2 +- 4 files changed, 14 insertions(+), 7 deletions(-) diff --git a/src/moonlink/src/storage/mooncake_table.rs b/src/moonlink/src/storage/mooncake_table.rs index f61dcb471..fe25152b1 100644 --- a/src/moonlink/src/storage/mooncake_table.rs +++ b/src/moonlink/src/storage/mooncake_table.rs @@ -64,6 +64,7 @@ use arrow_schema::Schema; use delete_vector::BatchDeletionVector; pub(crate) use disk_slice::DiskSliceWriter; use mem_slice::MemSlice; +use more_asserts as ma; pub(crate) use snapshot::SnapshotTableState; use std::collections::{BTreeMap, HashMap, HashSet}; use std::path::PathBuf; @@ -915,7 +916,9 @@ impl MooncakeTable { fn try_set_next_flush_lsn(&mut self, lsn: u64) { let min_pending_lsn = self.get_min_ongoing_flush_lsn(); if lsn < min_pending_lsn { - // TODO(hjiang): Add assertion that flush LSN never regresses, currently it's still buggy, I will fix in the followup PR. + if let Some(old_flush_lsn) = self.next_snapshot_task.new_flush_lsn { + ma::assert_le!(old_flush_lsn, lsn); + } self.next_snapshot_task.new_flush_lsn = Some(lsn); } } diff --git a/src/moonlink/src/storage/mooncake_table/table_operation_test_utils.rs b/src/moonlink/src/storage/mooncake_table/table_operation_test_utils.rs index f59d7afe7..233fc88a6 100644 --- a/src/moonlink/src/storage/mooncake_table/table_operation_test_utils.rs +++ b/src/moonlink/src/storage/mooncake_table/table_operation_test_utils.rs @@ -90,7 +90,7 @@ pub(crate) async fn flush_table_and_sync_no_apply( } } -/// Flush mooncake, block wait its completion. +/// Flush the given streaming transaction, block wait its completion. #[cfg(test)] pub(crate) async fn flush_stream_and_sync_no_apply( table: &mut MooncakeTable, diff --git a/src/moonlink/src/storage/mooncake_table/tests.rs b/src/moonlink/src/storage/mooncake_table/tests.rs index e654b4d6c..4b84f5a0a 100644 --- a/src/moonlink/src/storage/mooncake_table/tests.rs +++ b/src/moonlink/src/storage/mooncake_table/tests.rs @@ -2506,10 +2506,14 @@ async fn test_stream_commit_with_ongoing_flush_deletion_remapping() -> Result<() // Step 4: Flush the streaming transaction (creates disk slice) // The disk slice now contains the deletion remapping info - let disk_slice = - flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id, Some(4)) - .await - .expect("Disk slice should be present"); + let disk_slice = flush_stream_and_sync_no_apply( + &mut table, + &mut event_completion_rx, + xact_id, + /*lsn=*/ Some(3), + ) + .await + .expect("Disk slice should be present"); // Step 5: Commit the streaming transaction // This processes deletions and adds them to committed_deletion_log diff --git a/src/moonlink/src/table_handler/tests.rs b/src/moonlink/src/table_handler/tests.rs index 687645551..8511daece 100644 --- a/src/moonlink/src/table_handler/tests.rs +++ b/src/moonlink/src/table_handler/tests.rs @@ -350,7 +350,7 @@ async fn test_stream_delete_unflushed_non_streamed_row() { } #[tokio::test] -async fn test_hjiang_streaming_transaction_periodic_flush() { +async fn test_streaming_transaction_periodic_flush() { let env = TestEnvironment::default().await; let xact_id = 201; let commit_lsn = 20; // LSN at which the transaction will eventually commit From a6bde589566ccaa0672063c3ca18d802e35d20f9 Mon Sep 17 00:00:00 2001 From: dentiny Date: Wed, 27 Aug 2025 10:58:58 +0000 Subject: [PATCH 3/6] fix test --- src/moonlink/src/storage/mooncake_table.rs | 4 + .../src/storage/mooncake_table/tests.rs | 104 ++++++++++-------- .../mooncake_table/transaction_stream.rs | 19 ++-- src/moonlink/src/table_handler/tests.rs | 4 +- 4 files changed, 79 insertions(+), 52 deletions(-) diff --git a/src/moonlink/src/storage/mooncake_table.rs b/src/moonlink/src/storage/mooncake_table.rs index fe25152b1..124da6784 100644 --- a/src/moonlink/src/storage/mooncake_table.rs +++ b/src/moonlink/src/storage/mooncake_table.rs @@ -840,6 +840,10 @@ impl MooncakeTable { } /// Flushes the disk slice for the transaction. + /// + /// # Arguments + /// + /// * ongoing_flush_count: used to increment ongoing flush count for the given LSN. fn flush_disk_slice( &mut self, disk_slice: &mut DiskSliceWriter, diff --git a/src/moonlink/src/storage/mooncake_table/tests.rs b/src/moonlink/src/storage/mooncake_table/tests.rs index 4b84f5a0a..fbc922a58 100644 --- a/src/moonlink/src/storage/mooncake_table/tests.rs +++ b/src/moonlink/src/storage/mooncake_table/tests.rs @@ -737,10 +737,14 @@ async fn test_streaming_begin_flush_commit_end_flush() { // Begin the flush // This will drain the mem slice and add its relevant state to the stream state - let disk_slice = - flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id, Some(lsn)) - .await - .expect("Disk slice should be present"); + let disk_slice = flush_stream_and_sync_no_apply( + &mut table, + &mut event_completion_rx, + xact_id, + Some(lsn + 1), + ) + .await + .expect("Disk slice should be present"); // Wait to apply the flush to simulate an async flush that hasn't returned // Commit the transaction while the flush is pending @@ -808,25 +812,25 @@ async fn test_streaming_begin_flush_commit_end_flush_multiple() { // Begin the flush // This will drain the mem slice and add its relevant state to the stream state - let disk_slice1 = - flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id, Some(lsn)) - .await - .unwrap(); - - let row2 = test_row(2, "B", 21); - table.append_in_stream_batch(row2, xact_id).unwrap(); - - // Begin the flush - // This will drain the mem slice and add its relevant state to the stream state - let disk_slice2 = flush_stream_and_sync_no_apply( + let disk_slice1 = flush_stream_and_sync_no_apply( &mut table, &mut event_completion_rx, xact_id, - Some(lsn + 1), + /*lsn=*/ None, ) .await .unwrap(); + let row2 = test_row(2, "B", 21); + table.append_in_stream_batch(row2, xact_id).unwrap(); + + // Begin the flush + // This will drain the mem slice and add its relevant state to the stream state + let disk_slice2 = + flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id, Some(lsn)) + .await + .unwrap(); + // Wait to apply the flush to simulate an async flush that hasn't returned // Commit the transaction while the flush is pending // This will move the state from the stream state to the snapshot task @@ -987,10 +991,14 @@ async fn test_streaming_begin_flush_delete_commit_end_flush() { // Begin the flush // This will drain the mem slice and add its relevant state to the stream state - let disk_slice = - flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id, Some(lsn)) - .await - .unwrap(); + let disk_slice = flush_stream_and_sync_no_apply( + &mut table, + &mut event_completion_rx, + xact_id, + Some(lsn + 1), + ) + .await + .unwrap(); // Wait to apply the flush to simulate an async flush that hasn't returned // Commit the transaction while the flush is pending @@ -2216,10 +2224,14 @@ async fn test_streaming_batch_id_mismatch_with_data_compaction() -> Result<()> { // Step 3: Flush the streaming transaction // This creates a disk slice with the original batch IDs - let disk_slice = - flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id, Some(20)) - .await - .expect("Disk slice should be present"); + let disk_slice = flush_stream_and_sync_no_apply( + &mut table, + &mut event_completion_rx, + xact_id, + /*lsn=*/ None, + ) + .await + .expect("Disk slice should be present"); // Step 4: Delete one of the rows in the streaming transaction // This can cause some batches to become empty after filtering @@ -2292,12 +2304,6 @@ async fn test_streaming_empty_batch_filtering() -> Result<()> { table.append_in_stream_batch(test_row(2, "ToDelete", 21), xact_id1)?; table.append_in_stream_batch(test_row(3, "AlsoDelete", 22), xact_id1)?; - // Flush stream 1 - let disk_slice1 = - flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id1, Some(10)) - .await - .expect("Stream 1 disk slice should be present"); - // Delete all rows in stream 1 (making batches empty after filtering) table .delete_in_stream_batch(test_row(2, "ToDelete", 21), xact_id1) @@ -2306,8 +2312,18 @@ async fn test_streaming_empty_batch_filtering() -> Result<()> { .delete_in_stream_batch(test_row(3, "AlsoDelete", 22), xact_id1) .await; + // Flush stream 1 + let disk_slice1 = flush_stream_and_sync_no_apply( + &mut table, + &mut event_completion_rx, + xact_id1, + /*lsn=*/ Some(11), + ) + .await + .expect("Stream 1 disk slice should be present"); + // Commit stream 1 - table.commit_transaction_stream_impl(xact_id1, 11)?; + table.commit_transaction_stream_impl(xact_id1, /*lsn=*/ 11)?; table.apply_stream_flush_result( xact_id1, disk_slice1, @@ -2368,7 +2384,7 @@ async fn test_batch_id_removal_assertion_direct() -> Result<()> { // This creates a disk slice that contains the original batch ID // but the batch will be filtered out during processing (empty after deletion) let disk_slice = - flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id, Some(10)) + flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id, Some(11)) .await .expect("Disk slice should be present"); @@ -2506,14 +2522,10 @@ async fn test_stream_commit_with_ongoing_flush_deletion_remapping() -> Result<() // Step 4: Flush the streaming transaction (creates disk slice) // The disk slice now contains the deletion remapping info - let disk_slice = flush_stream_and_sync_no_apply( - &mut table, - &mut event_completion_rx, - xact_id, - /*lsn=*/ Some(3), - ) - .await - .expect("Disk slice should be present"); + let disk_slice = + flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id, Some(3)) + .await + .expect("Disk slice should be present"); // Step 5: Commit the streaming transaction // This processes deletions and adds them to committed_deletion_log @@ -2800,10 +2812,14 @@ async fn test_streaming_commit_before_flush_finishes_sets_flush_lsn() -> Result< table.append_in_stream_batch(row, xact_id)?; // Begin a streaming flush WITHOUT a writer LSN to simulate a periodic flush before commit - let disk_slice = - flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id, None) - .await - .expect("Disk slice should be present"); + let disk_slice = flush_stream_and_sync_no_apply( + &mut table, + &mut event_completion_rx, + xact_id, + Some(commit_lsn), + ) + .await + .expect("Disk slice should be present"); // Commit the transaction while the flush is still pending table diff --git a/src/moonlink/src/storage/mooncake_table/transaction_stream.rs b/src/moonlink/src/storage/mooncake_table/transaction_stream.rs index 46a55d5e6..756b1b1d5 100644 --- a/src/moonlink/src/storage/mooncake_table/transaction_stream.rs +++ b/src/moonlink/src/storage/mooncake_table/transaction_stream.rs @@ -417,12 +417,6 @@ impl MooncakeTable { .unwrap(); } - // Perform table flush completion notification. - if let Some(lsn) = disk_slice.lsn() { - self.remove_ongoing_flush_lsn(lsn); - self.try_set_next_flush_lsn(lsn); - } - let stream_state = self .transaction_stream_states .get_mut(&xact_id) @@ -446,6 +440,7 @@ impl MooncakeTable { let should_remove = stream_state.ongoing_flush_count == 0; let _ = stream_state; + self.remove_ongoing_flush_lsn(commit_lsn); self.try_set_next_flush_lsn(commit_lsn); self.next_snapshot_task.new_disk_slices.push(disk_slice); if should_remove { @@ -507,6 +502,9 @@ impl MooncakeTable { } } + /// # Arguments + /// + /// * lsn: commit LSN for the current streaming transaction if assigned. pub fn flush_stream( &mut self, xact_id: u32, @@ -537,7 +535,14 @@ impl MooncakeTable { let table_notify_tx = self.table_notify.as_ref().unwrap().clone(); stream_state.ongoing_flush_count += 1; - let ongoing_flush_count = stream_state.ongoing_flush_count; + + // For streaming transactions, only record ongoing flush operations when commit; + // otherwise it's completely invisible to the outside world. + let ongoing_flush_count = if lsn.is_some() { + stream_state.ongoing_flush_count + } else { + 0 + }; self.flush_disk_slice( &mut disk_slice, diff --git a/src/moonlink/src/table_handler/tests.rs b/src/moonlink/src/table_handler/tests.rs index 8511daece..d4a59e5d4 100644 --- a/src/moonlink/src/table_handler/tests.rs +++ b/src/moonlink/src/table_handler/tests.rs @@ -351,7 +351,7 @@ async fn test_stream_delete_unflushed_non_streamed_row() { #[tokio::test] async fn test_streaming_transaction_periodic_flush() { - let env = TestEnvironment::default().await; + let mut env = TestEnvironment::default().await; let xact_id = 201; let commit_lsn = 20; // LSN at which the transaction will eventually commit let initial_read_lsn_target = commit_lsn; // For verifying no data pre-commit @@ -386,6 +386,8 @@ async fn test_streaming_transaction_periodic_flush() { env.verify_snapshot(final_read_lsn_target, &[10, 11, 12]) .await; + + env.shutdown().await; } #[tokio::test] From d0d83486a659d8fd977eee645daa8d620af1790c Mon Sep 17 00:00:00 2001 From: dentiny Date: Wed, 27 Aug 2025 11:08:21 +0000 Subject: [PATCH 4/6] fix test --- src/moonlink/src/storage/mooncake_table/tests.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/moonlink/src/storage/mooncake_table/tests.rs b/src/moonlink/src/storage/mooncake_table/tests.rs index fbc922a58..1a2d2702b 100644 --- a/src/moonlink/src/storage/mooncake_table/tests.rs +++ b/src/moonlink/src/storage/mooncake_table/tests.rs @@ -1633,10 +1633,17 @@ async fn test_mixed_regular_and_streaming_lsn_ordering() -> Result<()> { // Start streaming flush with LSN 50 (lower than regular, but allowed for streaming) table.append_in_stream_batch(test_row(2, "B", 21), xact_id)?; - let streaming_disk_slice = - flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id, Some(50)) - .await - .expect("Streaming disk slice should be present"); + let streaming_disk_slice = flush_stream_and_sync_no_apply( + &mut table, + &mut event_completion_rx, + xact_id, + /*lsn=*/ Some(50), + ) + .await + .expect("Streaming disk slice should be present"); + table + .commit_transaction_stream_impl(xact_id, /*lsn=*/ 50) + .unwrap(); // Verify both are tracked and min is 50 assert!(table.ongoing_flush_lsns.contains_key(&100)); From db1d13a7370701729bf52259e0430ebf61f5e7d9 Mon Sep 17 00:00:00 2001 From: dentiny Date: Wed, 27 Aug 2025 11:23:01 +0000 Subject: [PATCH 5/6] fix test --- .../src/storage/mooncake_table/tests.rs | 17 +++++++++-------- .../mooncake_table/transaction_stream.rs | 2 ++ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/moonlink/src/storage/mooncake_table/tests.rs b/src/moonlink/src/storage/mooncake_table/tests.rs index 1a2d2702b..9e501d8c6 100644 --- a/src/moonlink/src/storage/mooncake_table/tests.rs +++ b/src/moonlink/src/storage/mooncake_table/tests.rs @@ -1026,14 +1026,13 @@ async fn test_streaming_begin_flush_delete_commit_end_flush() { table.delete_in_stream_batch(row2, xact_id2).await; // Commit the new transaction - table - .commit_transaction_stream_impl(xact_id2, lsn2 + 1) - .unwrap(); - let disk_slice2 = - flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id2, Some(lsn2)) + flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id2, Some(lsn2 + 1)) .await .unwrap(); + table + .commit_transaction_stream_impl(xact_id2, lsn2 + 1) + .unwrap(); table.apply_stream_flush_result( xact_id, @@ -1353,14 +1352,16 @@ async fn test_streaming_flush_lsns_tracking() -> Result<()> { // Flush streaming transactions with different LSNs (can be out of order) let disk_slice_1 = - flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id_1, Some(100)) + flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id_1, Some(50)) .await .expect("Disk slice 1 should be present"); + table.commit_transaction_stream_impl(xact_id_1, /*lsn=*/ 50).unwrap(); let disk_slice_2 = - flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id_2, Some(50)) + flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id_2, Some(100)) .await .expect("Disk slice 2 should be present"); + table.commit_transaction_stream_impl(xact_id_2, /*lsn=*/ 100).unwrap(); // Verify both streaming LSNs are tracked assert!(table.ongoing_flush_lsns.contains_key(&100)); @@ -2235,7 +2236,7 @@ async fn test_streaming_batch_id_mismatch_with_data_compaction() -> Result<()> { &mut table, &mut event_completion_rx, xact_id, - /*lsn=*/ None, + /*lsn=*/ Some(21), ) .await .expect("Disk slice should be present"); diff --git a/src/moonlink/src/storage/mooncake_table/transaction_stream.rs b/src/moonlink/src/storage/mooncake_table/transaction_stream.rs index 756b1b1d5..178c87a8c 100644 --- a/src/moonlink/src/storage/mooncake_table/transaction_stream.rs +++ b/src/moonlink/src/storage/mooncake_table/transaction_stream.rs @@ -585,6 +585,8 @@ impl MooncakeTable { ); } for (id, _) in stream_state.new_record_batches.iter() { + println!("when ingesting lsn {} has id = {}", lsn, *id); + assert!( self.next_snapshot_task .flushing_batch_lsn_map From daf05c92801bd6ad7d065c0f81ca6c4631833f3d Mon Sep 17 00:00:00 2001 From: dentiny Date: Wed, 27 Aug 2025 12:20:13 +0000 Subject: [PATCH 6/6] fix test --- .../src/storage/mooncake_table/tests.rs | 26 ++++++++++++------- .../mooncake_table/transaction_stream.rs | 2 -- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/src/moonlink/src/storage/mooncake_table/tests.rs b/src/moonlink/src/storage/mooncake_table/tests.rs index 3e6fa133b..038d6608a 100644 --- a/src/moonlink/src/storage/mooncake_table/tests.rs +++ b/src/moonlink/src/storage/mooncake_table/tests.rs @@ -1026,10 +1026,14 @@ async fn test_streaming_begin_flush_delete_commit_end_flush() { table.delete_in_stream_batch(row2, xact_id2).await; // Commit the new transaction - let disk_slice2 = - flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id2, Some(lsn2 + 1)) - .await - .unwrap(); + let disk_slice2 = flush_stream_and_sync_no_apply( + &mut table, + &mut event_completion_rx, + xact_id2, + Some(lsn2 + 1), + ) + .await + .unwrap(); table .commit_transaction_stream_impl(xact_id2, lsn2 + 1) .unwrap(); @@ -1355,13 +1359,17 @@ async fn test_streaming_flush_lsns_tracking() -> Result<()> { flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id_1, Some(50)) .await .expect("Disk slice 1 should be present"); - table.commit_transaction_stream_impl(xact_id_1, /*lsn=*/ 50).unwrap(); + table + .commit_transaction_stream_impl(xact_id_1, /*lsn=*/ 50) + .unwrap(); let disk_slice_2 = flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id_2, Some(100)) .await .expect("Disk slice 2 should be present"); - table.commit_transaction_stream_impl(xact_id_2, /*lsn=*/ 100).unwrap(); + table + .commit_transaction_stream_impl(xact_id_2, /*lsn=*/ 100) + .unwrap(); // Verify both streaming LSNs are tracked assert!(table.ongoing_flush_lsns.contains_key(&100)); @@ -1370,7 +1378,7 @@ async fn test_streaming_flush_lsns_tracking() -> Result<()> { // Mix with regular flush (must be higher than previous regular flush) append_rows(&mut table, vec![test_row(3, "C", 22)])?; - table.commit(3); + table.commit(103); let disk_slice_3 = flush_table_and_sync_no_apply(&mut table, &mut event_completion_rx, 75) .await .expect("Disk slice 3 should be present"); @@ -2938,8 +2946,8 @@ async fn test_streaming_batch_id_assignment() -> Result<()> { .unwrap(); // Verify both streaming LSNs are tracked - assert!(table.ongoing_flush_lsns.contains(&100)); - assert!(table.ongoing_flush_lsns.contains(&50)); + assert!(table.ongoing_flush_lsns.contains_key(&100)); + assert!(table.ongoing_flush_lsns.contains_key(&50)); assert_eq!(table.get_min_ongoing_flush_lsn(), 50); // Complete streaming flushes diff --git a/src/moonlink/src/storage/mooncake_table/transaction_stream.rs b/src/moonlink/src/storage/mooncake_table/transaction_stream.rs index 178c87a8c..756b1b1d5 100644 --- a/src/moonlink/src/storage/mooncake_table/transaction_stream.rs +++ b/src/moonlink/src/storage/mooncake_table/transaction_stream.rs @@ -585,8 +585,6 @@ impl MooncakeTable { ); } for (id, _) in stream_state.new_record_batches.iter() { - println!("when ingesting lsn {} has id = {}", lsn, *id); - assert!( self.next_snapshot_task .flushing_batch_lsn_map