diff --git a/src/moonlink/src/storage/mooncake_table.rs b/src/moonlink/src/storage/mooncake_table.rs index 30061743a..22cde1be7 100644 --- a/src/moonlink/src/storage/mooncake_table.rs +++ b/src/moonlink/src/storage/mooncake_table.rs @@ -64,8 +64,9 @@ use arrow_schema::Schema; use delete_vector::BatchDeletionVector; pub(crate) use disk_slice::DiskSliceWriter; use mem_slice::MemSlice; +use more_asserts as ma; pub(crate) use snapshot::SnapshotTableState; -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::path::PathBuf; use std::sync::Arc; use table_snapshot::{IcebergSnapshotImportResult, IcebergSnapshotIndexMergeResult}; @@ -462,7 +463,8 @@ pub struct MooncakeTable { wal_manager: WalManager, /// LSN of ongoing flushes. - pub ongoing_flush_lsns: BTreeSet, + /// Maps from LSN to its count. + pub ongoing_flush_lsns: BTreeMap, /// Table replay sender. event_replay_tx: Option>, @@ -565,7 +567,7 @@ impl MooncakeTable { last_iceberg_snapshot_lsn, table_notify: None, wal_manager, - ongoing_flush_lsns: BTreeSet::new(), + ongoing_flush_lsns: BTreeMap::new(), event_replay_tx: None, }) } @@ -840,15 +842,20 @@ impl MooncakeTable { } /// Flushes the disk slice for the transaction. + /// + /// # Arguments + /// + /// * ongoing_flush_count: used to increment ongoing flush count for the given LSN. fn flush_disk_slice( &mut self, disk_slice: &mut DiskSliceWriter, table_notify_tx: Sender, xact_id: Option, + ongoing_flush_count: u32, event_id: uuid::Uuid, ) { if let Some(lsn) = disk_slice.lsn() { - self.insert_ongoing_flush_lsn(lsn); + self.insert_ongoing_flush_lsn(lsn, ongoing_flush_count); } else { assert!( xact_id.is_some(), @@ -915,31 +922,41 @@ impl MooncakeTable { fn try_set_next_flush_lsn(&mut self, lsn: u64) { let min_pending_lsn = self.get_min_ongoing_flush_lsn(); if lsn < min_pending_lsn { + if let Some(old_flush_lsn) = self.next_snapshot_task.new_flush_lsn { + ma::assert_le!(old_flush_lsn, lsn); + } self.next_snapshot_task.new_flush_lsn = Some(lsn); } } // We fallback to u64::MAX if there are no pending flush LSNs so that the LSN is always greater than the flush LSN and the iceberg snapshot can proceed. pub fn get_min_ongoing_flush_lsn(&self) -> u64 { - self.ongoing_flush_lsns - .iter() - .next() - .copied() - .unwrap_or(u64::MAX) + if let Some((lsn, _)) = self.ongoing_flush_lsns.first_key_value() { + return *lsn; + } + u64::MAX } - pub fn insert_ongoing_flush_lsn(&mut self, lsn: u64) { - assert!( - self.ongoing_flush_lsns.insert(lsn), - "LSN {lsn} already in pending flush LSNs" - ); + pub fn insert_ongoing_flush_lsn(&mut self, lsn: u64, count: u32) { + *self.ongoing_flush_lsns.entry(lsn).or_insert(0) += count; } pub fn remove_ongoing_flush_lsn(&mut self, lsn: u64) { - assert!( - self.ongoing_flush_lsns.remove(&lsn), - "LSN {lsn} not found in pending flush LSNs" - ); + use std::collections::btree_map::Entry; + + match self.ongoing_flush_lsns.entry(lsn) { + Entry::Occupied(mut entry) => { + let counter = entry.get_mut(); + if *counter > 1 { + *counter -= 1; + } else { + entry.remove(); + } + } + Entry::Vacant(_) => { + panic!("Tried to remove LSN {lsn}, but it is not tracked"); + } + } } pub fn has_ongoing_flush(&self) -> bool { @@ -1205,7 +1222,7 @@ impl MooncakeTable { ); let table_notify_tx = self.table_notify.as_ref().unwrap().clone(); - if self.mem_slice.is_empty() || self.ongoing_flush_lsns.contains(&lsn) { + if self.mem_slice.is_empty() || self.ongoing_flush_lsns.contains_key(&lsn) { self.try_set_next_flush_lsn(lsn); tokio::task::spawn(async move { table_notify_tx @@ -1238,6 +1255,7 @@ impl MooncakeTable { &mut disk_slice, table_notify_tx, /*xact_id=*/ None, + /*ongoing_flush_count=*/ 1, event_id, ); diff --git a/src/moonlink/src/storage/mooncake_table/table_operation_test_utils.rs b/src/moonlink/src/storage/mooncake_table/table_operation_test_utils.rs index f59d7afe7..233fc88a6 100644 --- a/src/moonlink/src/storage/mooncake_table/table_operation_test_utils.rs +++ b/src/moonlink/src/storage/mooncake_table/table_operation_test_utils.rs @@ -90,7 +90,7 @@ pub(crate) async fn flush_table_and_sync_no_apply( } } -/// Flush mooncake, block wait its completion. +/// Flush the given streaming transaction, block wait its completion. #[cfg(test)] pub(crate) async fn flush_stream_and_sync_no_apply( table: &mut MooncakeTable, diff --git a/src/moonlink/src/storage/mooncake_table/tests.rs b/src/moonlink/src/storage/mooncake_table/tests.rs index 5f7498fc7..038d6608a 100644 --- a/src/moonlink/src/storage/mooncake_table/tests.rs +++ b/src/moonlink/src/storage/mooncake_table/tests.rs @@ -737,10 +737,14 @@ async fn test_streaming_begin_flush_commit_end_flush() { // Begin the flush // This will drain the mem slice and add its relevant state to the stream state - let disk_slice = - flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id, Some(lsn)) - .await - .expect("Disk slice should be present"); + let disk_slice = flush_stream_and_sync_no_apply( + &mut table, + &mut event_completion_rx, + xact_id, + Some(lsn + 1), + ) + .await + .expect("Disk slice should be present"); // Wait to apply the flush to simulate an async flush that hasn't returned // Commit the transaction while the flush is pending @@ -808,25 +812,25 @@ async fn test_streaming_begin_flush_commit_end_flush_multiple() { // Begin the flush // This will drain the mem slice and add its relevant state to the stream state - let disk_slice1 = - flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id, Some(lsn)) - .await - .unwrap(); - - let row2 = test_row(2, "B", 21); - table.append_in_stream_batch(row2, xact_id).unwrap(); - - // Begin the flush - // This will drain the mem slice and add its relevant state to the stream state - let disk_slice2 = flush_stream_and_sync_no_apply( + let disk_slice1 = flush_stream_and_sync_no_apply( &mut table, &mut event_completion_rx, xact_id, - Some(lsn + 1), + /*lsn=*/ None, ) .await .unwrap(); + let row2 = test_row(2, "B", 21); + table.append_in_stream_batch(row2, xact_id).unwrap(); + + // Begin the flush + // This will drain the mem slice and add its relevant state to the stream state + let disk_slice2 = + flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id, Some(lsn)) + .await + .unwrap(); + // Wait to apply the flush to simulate an async flush that hasn't returned // Commit the transaction while the flush is pending // This will move the state from the stream state to the snapshot task @@ -987,10 +991,14 @@ async fn test_streaming_begin_flush_delete_commit_end_flush() { // Begin the flush // This will drain the mem slice and add its relevant state to the stream state - let disk_slice = - flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id, Some(lsn)) - .await - .unwrap(); + let disk_slice = flush_stream_and_sync_no_apply( + &mut table, + &mut event_completion_rx, + xact_id, + Some(lsn + 1), + ) + .await + .unwrap(); // Wait to apply the flush to simulate an async flush that hasn't returned // Commit the transaction while the flush is pending @@ -1018,15 +1026,18 @@ async fn test_streaming_begin_flush_delete_commit_end_flush() { table.delete_in_stream_batch(row2, xact_id2).await; // Commit the new transaction + let disk_slice2 = flush_stream_and_sync_no_apply( + &mut table, + &mut event_completion_rx, + xact_id2, + Some(lsn2 + 1), + ) + .await + .unwrap(); table .commit_transaction_stream_impl(xact_id2, lsn2 + 1) .unwrap(); - let disk_slice2 = - flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id2, Some(lsn2)) - .await - .unwrap(); - table.apply_stream_flush_result( xact_id, disk_slice, @@ -1300,9 +1311,9 @@ async fn test_ongoing_flush_lsns_tracking() -> Result<()> { .expect("Disk slice 3 should be present"); // Verify all LSNs are tracked - assert!(table.ongoing_flush_lsns.contains(&5)); - assert!(table.ongoing_flush_lsns.contains(&10)); - assert!(table.ongoing_flush_lsns.contains(&15)); + assert!(table.ongoing_flush_lsns.contains_key(&5)); + assert!(table.ongoing_flush_lsns.contains_key(&10)); + assert!(table.ongoing_flush_lsns.contains_key(&15)); assert_eq!(table.ongoing_flush_lsns.len(), 3); // Verify min is correctly calculated (should be 5) @@ -1310,15 +1321,15 @@ async fn test_ongoing_flush_lsns_tracking() -> Result<()> { // Complete flush with LSN 10 (out of order completion) table.apply_flush_result(disk_slice_2, uuid::Uuid::new_v4() /*placeholder*/); - assert!(table.ongoing_flush_lsns.contains(&5)); - assert!(!table.ongoing_flush_lsns.contains(&10)); - assert!(table.ongoing_flush_lsns.contains(&15)); + assert!(table.ongoing_flush_lsns.contains_key(&5)); + assert!(!table.ongoing_flush_lsns.contains_key(&10)); + assert!(table.ongoing_flush_lsns.contains_key(&15)); assert_eq!(table.get_min_ongoing_flush_lsn(), 5); // Still 5 // Complete flush with LSN 5 table.apply_flush_result(disk_slice_1, uuid::Uuid::new_v4() /*placeholder*/); - assert!(!table.ongoing_flush_lsns.contains(&5)); - assert!(table.ongoing_flush_lsns.contains(&15)); + assert!(!table.ongoing_flush_lsns.contains_key(&5)); + assert!(table.ongoing_flush_lsns.contains_key(&15)); assert_eq!(table.get_min_ongoing_flush_lsn(), 15); // Now 15 // Complete last flush @@ -1345,31 +1356,37 @@ async fn test_streaming_flush_lsns_tracking() -> Result<()> { // Flush streaming transactions with different LSNs (can be out of order) let disk_slice_1 = - flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id_1, Some(100)) + flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id_1, Some(50)) .await .expect("Disk slice 1 should be present"); + table + .commit_transaction_stream_impl(xact_id_1, /*lsn=*/ 50) + .unwrap(); let disk_slice_2 = - flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id_2, Some(50)) + flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id_2, Some(100)) .await .expect("Disk slice 2 should be present"); + table + .commit_transaction_stream_impl(xact_id_2, /*lsn=*/ 100) + .unwrap(); // Verify both streaming LSNs are tracked - assert!(table.ongoing_flush_lsns.contains(&100)); - assert!(table.ongoing_flush_lsns.contains(&50)); + assert!(table.ongoing_flush_lsns.contains_key(&100)); + assert!(table.ongoing_flush_lsns.contains_key(&50)); assert_eq!(table.get_min_ongoing_flush_lsn(), 50); // Mix with regular flush (must be higher than previous regular flush) append_rows(&mut table, vec![test_row(3, "C", 22)])?; - table.commit(3); + table.commit(103); let disk_slice_3 = flush_table_and_sync_no_apply(&mut table, &mut event_completion_rx, 75) .await .expect("Disk slice 3 should be present"); // Verify all three LSNs are tracked - assert!(table.ongoing_flush_lsns.contains(&100)); - assert!(table.ongoing_flush_lsns.contains(&50)); - assert!(table.ongoing_flush_lsns.contains(&75)); + assert!(table.ongoing_flush_lsns.contains_key(&100)); + assert!(table.ongoing_flush_lsns.contains_key(&50)); + assert!(table.ongoing_flush_lsns.contains_key(&75)); assert_eq!(table.get_min_ongoing_flush_lsn(), 50); // Complete streaming flushes @@ -1580,22 +1597,22 @@ async fn test_out_of_order_flush_completion() -> Result<()> { .expect("Disk slice should be present"); // Verify all are pending and min is 10 - assert!(table.ongoing_flush_lsns.contains(&10)); - assert!(table.ongoing_flush_lsns.contains(&20)); - assert!(table.ongoing_flush_lsns.contains(&30)); + assert!(table.ongoing_flush_lsns.contains_key(&10)); + assert!(table.ongoing_flush_lsns.contains_key(&20)); + assert!(table.ongoing_flush_lsns.contains_key(&30)); assert_eq!(table.get_min_ongoing_flush_lsn(), 10); // Complete flush 30 first (out of order) table.apply_flush_result(disk_slice_30, uuid::Uuid::new_v4() /*placeholder*/); - assert!(!table.ongoing_flush_lsns.contains(&30)); - assert!(table.ongoing_flush_lsns.contains(&10)); - assert!(table.ongoing_flush_lsns.contains(&20)); + assert!(!table.ongoing_flush_lsns.contains_key(&30)); + assert!(table.ongoing_flush_lsns.contains_key(&10)); + assert!(table.ongoing_flush_lsns.contains_key(&20)); assert_eq!(table.get_min_ongoing_flush_lsn(), 10); // Still 10 // Complete flush 10 (should update min to 20) table.apply_flush_result(disk_slice_10, uuid::Uuid::new_v4() /*placeholder*/); - assert!(!table.ongoing_flush_lsns.contains(&10)); - assert!(table.ongoing_flush_lsns.contains(&20)); + assert!(!table.ongoing_flush_lsns.contains_key(&10)); + assert!(table.ongoing_flush_lsns.contains_key(&20)); assert_eq!(table.get_min_ongoing_flush_lsn(), 20); // Complete flush 20 (should clear all) @@ -1625,14 +1642,21 @@ async fn test_mixed_regular_and_streaming_lsn_ordering() -> Result<()> { // Start streaming flush with LSN 50 (lower than regular, but allowed for streaming) table.append_in_stream_batch(test_row(2, "B", 21), xact_id)?; - let streaming_disk_slice = - flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id, Some(50)) - .await - .expect("Streaming disk slice should be present"); + let streaming_disk_slice = flush_stream_and_sync_no_apply( + &mut table, + &mut event_completion_rx, + xact_id, + /*lsn=*/ Some(50), + ) + .await + .expect("Streaming disk slice should be present"); + table + .commit_transaction_stream_impl(xact_id, /*lsn=*/ 50) + .unwrap(); // Verify both are tracked and min is 50 - assert!(table.ongoing_flush_lsns.contains(&100)); - assert!(table.ongoing_flush_lsns.contains(&50)); + assert!(table.ongoing_flush_lsns.contains_key(&100)); + assert!(table.ongoing_flush_lsns.contains_key(&50)); assert_eq!(table.get_min_ongoing_flush_lsn(), 50); // According to table handler logic, iceberg snapshots with flush_lsn >= 50 should be blocked @@ -1662,8 +1686,8 @@ async fn test_mixed_regular_and_streaming_lsn_ordering() -> Result<()> { streaming_disk_slice, uuid::Uuid::new_v4(), /*placeholder*/ ); - assert!(!table.ongoing_flush_lsns.contains(&50)); - assert!(table.ongoing_flush_lsns.contains(&100)); + assert!(!table.ongoing_flush_lsns.contains_key(&50)); + assert!(table.ongoing_flush_lsns.contains_key(&100)); assert_eq!(table.get_min_ongoing_flush_lsn(), 100); // Now iceberg snapshots with flush_lsn < 100 should be allowed @@ -1988,7 +2012,7 @@ async fn test_iceberg_snapshot_blocked_by_ongoing_flushes() -> Result<()> { table.commit(2); // Verify we have pending flushes - assert!(table.ongoing_flush_lsns.contains(&30)); + assert!(table.ongoing_flush_lsns.contains_key(&30)); assert_eq!(table.get_min_ongoing_flush_lsn(), 30); // Create a mooncake snapshot - this will create an iceberg payload @@ -2090,9 +2114,9 @@ async fn test_out_of_order_flush_completion_with_iceberg_snapshots() -> Result<( // Verify all pending flushes and min pending LSN assert_eq!(table.get_min_ongoing_flush_lsn(), 10); - assert!(table.ongoing_flush_lsns.contains(&10)); - assert!(table.ongoing_flush_lsns.contains(&20)); - assert!(table.ongoing_flush_lsns.contains(&30)); + assert!(table.ongoing_flush_lsns.contains_key(&10)); + assert!(table.ongoing_flush_lsns.contains_key(&20)); + assert!(table.ongoing_flush_lsns.contains_key(&30)); // Create snapshot and test constraint with min_ongoing_flush_lsn = 10 let created = table.create_snapshot(SnapshotOption { @@ -2136,16 +2160,16 @@ async fn test_out_of_order_flush_completion_with_iceberg_snapshots() -> Result<( // Complete flushes OUT OF ORDER - complete middle one first (LSN 20) table.apply_flush_result(disk_slice_2, uuid::Uuid::new_v4() /*placeholder*/); assert_eq!(table.get_min_ongoing_flush_lsn(), 10); // Should still be 10 - assert!(table.ongoing_flush_lsns.contains(&10)); - assert!(!table.ongoing_flush_lsns.contains(&20)); - assert!(table.ongoing_flush_lsns.contains(&30)); + assert!(table.ongoing_flush_lsns.contains_key(&10)); + assert!(!table.ongoing_flush_lsns.contains_key(&20)); + assert!(table.ongoing_flush_lsns.contains_key(&30)); // Complete the first flush (LSN 10) - now min should be 30 table.apply_flush_result(disk_slice_1, uuid::Uuid::new_v4() /*placeholder*/); assert_eq!(table.get_min_ongoing_flush_lsn(), 30); // Now should be 30 - assert!(!table.ongoing_flush_lsns.contains(&10)); - assert!(!table.ongoing_flush_lsns.contains(&20)); - assert!(table.ongoing_flush_lsns.contains(&30)); + assert!(!table.ongoing_flush_lsns.contains_key(&10)); + assert!(!table.ongoing_flush_lsns.contains_key(&20)); + assert!(table.ongoing_flush_lsns.contains_key(&30)); // Test constraint logic with new min pending flush LSN = 30 let can_initiate_low = TableHandlerState::can_initiate_iceberg_snapshot( @@ -2216,10 +2240,14 @@ async fn test_streaming_batch_id_mismatch_with_data_compaction() -> Result<()> { // Step 3: Flush the streaming transaction // This creates a disk slice with the original batch IDs - let disk_slice = - flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id, Some(20)) - .await - .expect("Disk slice should be present"); + let disk_slice = flush_stream_and_sync_no_apply( + &mut table, + &mut event_completion_rx, + xact_id, + /*lsn=*/ Some(21), + ) + .await + .expect("Disk slice should be present"); // Step 4: Delete one of the rows in the streaming transaction // This can cause some batches to become empty after filtering @@ -2292,12 +2320,6 @@ async fn test_streaming_empty_batch_filtering() -> Result<()> { table.append_in_stream_batch(test_row(2, "ToDelete", 21), xact_id1)?; table.append_in_stream_batch(test_row(3, "AlsoDelete", 22), xact_id1)?; - // Flush stream 1 - let disk_slice1 = - flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id1, Some(10)) - .await - .expect("Stream 1 disk slice should be present"); - // Delete all rows in stream 1 (making batches empty after filtering) table .delete_in_stream_batch(test_row(2, "ToDelete", 21), xact_id1) @@ -2306,8 +2328,18 @@ async fn test_streaming_empty_batch_filtering() -> Result<()> { .delete_in_stream_batch(test_row(3, "AlsoDelete", 22), xact_id1) .await; + // Flush stream 1 + let disk_slice1 = flush_stream_and_sync_no_apply( + &mut table, + &mut event_completion_rx, + xact_id1, + /*lsn=*/ Some(11), + ) + .await + .expect("Stream 1 disk slice should be present"); + // Commit stream 1 - table.commit_transaction_stream_impl(xact_id1, 11)?; + table.commit_transaction_stream_impl(xact_id1, /*lsn=*/ 11)?; table.apply_stream_flush_result( xact_id1, disk_slice1, @@ -2368,7 +2400,7 @@ async fn test_batch_id_removal_assertion_direct() -> Result<()> { // This creates a disk slice that contains the original batch ID // but the batch will be filtered out during processing (empty after deletion) let disk_slice = - flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id, Some(10)) + flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id, Some(11)) .await .expect("Disk slice should be present"); @@ -2507,7 +2539,7 @@ async fn test_stream_commit_with_ongoing_flush_deletion_remapping() -> Result<() // Step 4: Flush the streaming transaction (creates disk slice) // The disk slice now contains the deletion remapping info let disk_slice = - flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id, Some(4)) + flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id, Some(3)) .await .expect("Disk slice should be present"); @@ -2796,10 +2828,14 @@ async fn test_streaming_commit_before_flush_finishes_sets_flush_lsn() -> Result< table.append_in_stream_batch(row, xact_id)?; // Begin a streaming flush WITHOUT a writer LSN to simulate a periodic flush before commit - let disk_slice = - flush_stream_and_sync_no_apply(&mut table, &mut event_completion_rx, xact_id, None) - .await - .expect("Disk slice should be present"); + let disk_slice = flush_stream_and_sync_no_apply( + &mut table, + &mut event_completion_rx, + xact_id, + Some(commit_lsn), + ) + .await + .expect("Disk slice should be present"); // Commit the transaction while the flush is still pending table @@ -2910,8 +2946,8 @@ async fn test_streaming_batch_id_assignment() -> Result<()> { .unwrap(); // Verify both streaming LSNs are tracked - assert!(table.ongoing_flush_lsns.contains(&100)); - assert!(table.ongoing_flush_lsns.contains(&50)); + assert!(table.ongoing_flush_lsns.contains_key(&100)); + assert!(table.ongoing_flush_lsns.contains_key(&50)); assert_eq!(table.get_min_ongoing_flush_lsn(), 50); // Complete streaming flushes diff --git a/src/moonlink/src/storage/mooncake_table/transaction_stream.rs b/src/moonlink/src/storage/mooncake_table/transaction_stream.rs index 9b07778fb..756b1b1d5 100644 --- a/src/moonlink/src/storage/mooncake_table/transaction_stream.rs +++ b/src/moonlink/src/storage/mooncake_table/transaction_stream.rs @@ -417,12 +417,6 @@ impl MooncakeTable { .unwrap(); } - // Perform table flush completion notification. - if let Some(lsn) = disk_slice.lsn() { - self.remove_ongoing_flush_lsn(lsn); - self.try_set_next_flush_lsn(lsn); - } - let stream_state = self .transaction_stream_states .get_mut(&xact_id) @@ -446,6 +440,7 @@ impl MooncakeTable { let should_remove = stream_state.ongoing_flush_count == 0; let _ = stream_state; + self.remove_ongoing_flush_lsn(commit_lsn); self.try_set_next_flush_lsn(commit_lsn); self.next_snapshot_task.new_disk_slices.push(disk_slice); if should_remove { @@ -507,6 +502,9 @@ impl MooncakeTable { } } + /// # Arguments + /// + /// * lsn: commit LSN for the current streaming transaction if assigned. pub fn flush_stream( &mut self, xact_id: u32, @@ -538,7 +536,21 @@ impl MooncakeTable { stream_state.ongoing_flush_count += 1; - self.flush_disk_slice(&mut disk_slice, table_notify_tx, Some(xact_id), event_id); + // For streaming transactions, only record ongoing flush operations when commit; + // otherwise it's completely invisible to the outside world. + let ongoing_flush_count = if lsn.is_some() { + stream_state.ongoing_flush_count + } else { + 0 + }; + + self.flush_disk_slice( + &mut disk_slice, + table_notify_tx, + Some(xact_id), + ongoing_flush_count, + event_id, + ); // Add back stream state self.transaction_stream_states.insert(xact_id, stream_state);