Catch up MLS backlog before group sends in app path#513
Catch up MLS backlog before group sends in app path#513reez wants to merge 1 commit intosledtools:masterfrom
Conversation
📝 WalkthroughWalkthroughAdds a backlog-aware group rumor publish flow: publishing a prepared group rumor now fetches backlog events from selected relays, ingests them (up to three passes, deferring unprocessable items), persists processed MLS event IDs to avoid reprocessing, and uses a new internal event to resume final publish. Changes
Sequence DiagramsequenceDiagram
participant Client as Client/Publisher
participant ChatMedia as ChatMedia
participant Relays as Relays/Network
participant Backlog as Backlog Fetcher
participant MDK as MDK/Processor
participant AppCore as AppCore State
Client->>ChatMedia: publish_prepared_group_rumor(chat_id, rumor)
ChatMedia->>Relays: determine selected relays
ChatMedia->>Backlog: spawn async backlog fetch (selected relays)
Backlog->>Relays: request backlog events
Relays-->>Backlog: return backlog_events or error
Backlog->>ChatMedia: emit GroupSendCatchupCompleted(chat_id, rumor, backlog_events, error)
ChatMedia->>MDK: ingest_group_backlog_events(backlog_events) (up to 3 passes)
MDK->>AppCore: deliver/handle events
AppCore->>AppCore: note_processed_group_event_id(event_id) & persist set
ChatMedia->>Relays: re-publish prepared rumor after ingestion
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@rust/src/core/chat_media.rs`:
- Around line 1908-1915: The early returns when self.session is None or group
missing must mark the previously-inserted optimistic send as failed: before each
return in the block that currently checks let Some(sess) = self.session.as_mut()
and let Some(group) = sess.groups.get(&chat_id).cloned(), update the pending
entry keyed by rumor_id_hex in self.local_outbox to a failure state and write a
corresponding failure entry into self.delivery_overrides (include error metadata
or a retry marker consistent with the existing failure format), then proceed to
call self.toast and return; reference self.local_outbox,
self.delivery_overrides, rumor_id_hex, self.session and sess.groups to locate
and update the entries.
In `@rust/src/core/mod.rs`:
- Around line 774-775: The dedupe cache processed_group_event_ids is surviving
MLS state resets because its in-memory HashSet is not cleared when the MDK DB or
MLS store is wiped; update the reset/logout/ wipe_local_data() flows to either
(a) scope processed_group_event_ids to the current account/MLS store (e.g.,
keyed by account id or MLS store identifier) so a new login uses a fresh set, or
(b) explicitly clear the in-memory HashSet and delete its backing file whenever
profile_db / MDK DB or MLS state is reset (logout, reprovision, or
wipe_local_data()); locate uses of processed_group_event_ids and
wipe_local_data() and ensure both memory and disk are cleared in the same code
paths so catch-up won’t skip commits after a fresh MLS store is created.
- Around line 4483-4493: The in-memory processed-event cache
(processed_group_event_ids) in note_processed_group_event_id grows unbounded
because persist_processed_mls_event_ids only truncates the serialized file;
change the in-memory structure to track recency (e.g. maintain a
HashSet<EventId> plus a VecDeque<EventId> or use an LRU structure) and enforce a
MAX_PROCESSED_IDS cap on insert: when inserting in
note_processed_group_event_id, if the ID already exists move it to the back of
the recency queue, otherwise push_back and insert into the set, and while the
queue length > MAX_PROCESSED_IDS pop_front and remove that ID from the set so
memory and disk eviction are consistent; after these bounded updates still call
persist_processed_mls_event_ids with self.data_dir and the trimmed set so both
in-memory and on-disk representations match.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 3966deb9-ee3e-429d-a9be-6451e35cd0f4
📒 Files selected for processing (3)
rust/src/core/chat_media.rsrust/src/core/mod.rsrust/src/updates.rs
| let (mut backlog_events, error) = match client | ||
| .fetch_events_from(relays.clone(), filter, std::time::Duration::from_secs(8)) | ||
| .await | ||
| { | ||
| Ok(evts) => (evts.into_iter().collect::<Vec<Event>>(), None), | ||
| Err(e) => (Vec::new(), Some(format!("group backlog fetch failed: {e}"))), |
There was a problem hiding this comment.
Don't continue to create_message() after a failed catch-up.
Line 1837 converts backlog fetch failure into error, but Lines 1860-1864 only log it and continue. Lines 1899-1901 do the same when backlog ingestion still leaves events behind. That means publish_prepared_group_rumor() can still encrypt from a known-incomplete MLS state, which is the exact stale-epoch failure mode this PR is trying to avoid.
Suggested direction
pub(super) fn handle_group_send_catchup_completed(
&mut self,
chat_id: String,
rumor: UnsignedEvent,
backlog_events: Vec<Event>,
error: Option<String>,
) {
+ let rumor_id_hex = rumor.id().to_hex();
if let Some(err) = error {
- tracing::warn!(%chat_id, %err, "group send backlog fetch failed");
+ let delivery = MessageDeliveryState::Failed {
+ reason: format!("group backlog fetch failed: {err}"),
+ };
+ self.delivery_overrides
+ .entry(chat_id.clone())
+ .or_default()
+ .insert(rumor_id_hex.clone(), delivery.clone());
+ self.fail_delivery_or_refresh(&chat_id, &rumor_id_hex, delivery);
+ self.refresh_chat_list_from_storage();
+ return;
}
- self.ingest_group_backlog_events(backlog_events);
+ if !self.ingest_group_backlog_events(backlog_events) {
+ let delivery = MessageDeliveryState::Failed {
+ reason: "group backlog catch-up incomplete".into(),
+ };
+ self.delivery_overrides
+ .entry(chat_id.clone())
+ .or_default()
+ .insert(rumor_id_hex.clone(), delivery.clone());
+ self.fail_delivery_or_refresh(&chat_id, &rumor_id_hex, delivery);
+ self.refresh_chat_list_from_storage();
+ return;
+ }
self.publish_prepared_group_rumor(chat_id, rumor);
}
-fn ingest_group_backlog_events(&mut self, backlog_events: Vec<Event>) {
+fn ingest_group_backlog_events(&mut self, backlog_events: Vec<Event>) -> bool {
let mut remaining = backlog_events;
for _ in 0..3 {
if remaining.is_empty() {
break;
}
@@
- let Some(sess) = self.session.as_mut() else {
- return;
+ let Some(sess) = self.session.as_mut() else {
+ return false;
};
sess.mdk.process_message(&event)
};
@@
- if !remaining.is_empty() {
- tracing::warn!(remaining = remaining.len(), "failed to ingest some backlog events");
- }
+ if !remaining.is_empty() {
+ tracing::warn!(remaining = remaining.len(), "failed to ingest some backlog events");
+ return false;
+ }
+ true
}Also applies to: 1860-1864, 1899-1901
There was a problem hiding this comment.
good idea, but this PR is intentionally matching the CLI’s current best-effort catch-up behavior (parity) rather than changing/introducing a new fail-closed policy.
but it could be a good follow up
There was a problem hiding this comment.
@reez, understood — keeping best-effort parity with the CLI makes sense as an incremental step. Would you like me to open a follow-up issue to track the fail-closed catch-up policy (failing the send when backlog fetch errors or ingestion is incomplete), so it doesn't get lost?
✏️ Learnings added
Learnt from: reez
Repo: sledtools/pika PR: 513
File: rust/src/core/chat_media.rs:1832-1837
Timestamp: 2026-03-08T21:14:08.184Z
Learning: In `rust/src/core/chat_media.rs`, the `handle_group_send_catchup_completed` / `ingest_group_backlog_events` flow intentionally uses best-effort (log-and-continue) catch-up behavior to match CLI parity. Failing the send on backlog fetch error or incomplete ingestion (fail-closed policy) is explicitly deferred to a future follow-up PR. Do not flag the log-and-continue pattern as a bug in this context.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
🧠 Learnings used
Learnt from: futurepaul
Repo: sledtools/pika PR: 429
File: rust/src/core/chat_media.rs:1257-1262
Timestamp: 2026-03-05T00:37:01.166Z
Learning: In `rust/src/core/chat_media.rs`, within `handle_batch_upload_completed`, the early returns when `self.session` is unavailable (around the `let Some(sess) = self.session.as_ref() else { return; }` check) and when `sess.local_keys` is unavailable do not transition the batch to a Failed state, which can strand the batch in pending/uploading. The fix — marking the batch as Failed and cleaning up outbox/delivery_overrides — is intentionally deferred to a follow-up PR alongside broader batch error-handling hardening (same class as the missing-URL/hash early returns in the same function).
Learnt from: futurepaul
Repo: sledtools/pika PR: 429
File: rust/src/core/chat_media.rs:874-895
Timestamp: 2026-03-05T00:37:03.771Z
Learning: In rust/src/core/chat_media.rs, within send_chat_media_batch, when the MDK re-encodes media and the local preview copy to final_local_path fails, the code should fall back to pp.local_path instead of setting att.local_path to None. This avoids temporarily losing the thumbnail in the outbox entry due to a cosmetic issue since uploads use encrypted_data in memory. Apply this fallback pattern in similar Rust code paths where a file copy may fail but an existing local_path could still be valid. This is a known follow-up and should be implemented as a defensive fallback.
| fn note_processed_group_event_id(&mut self, event_id: EventId) { | ||
| if !self.processed_group_event_ids.insert(event_id) { | ||
| return; | ||
| } | ||
| if let Err(e) = pika_marmot_runtime::persist_processed_mls_event_ids( | ||
| std::path::Path::new(&self.data_dir), | ||
| &self.processed_group_event_ids, | ||
| ) { | ||
| tracing::warn!(%e, "failed to persist processed group event ids"); | ||
| } | ||
| } |
There was a problem hiding this comment.
Bound the processed-event cache by recency in memory too.
persist_processed_mls_event_ids() only trims the serialized output, and it does it after lexically sorting IDs. This HashSet therefore grows without bound for the life of the process, while the on-disk file can evict arbitrary IDs instead of the most recent ones. On a long-lived session, every group message turns into a larger sort/write than the last and the duplicate filter gets weaker once the cap is hit. Please track insertion order (for example HashSet + VecDeque/LRU) and evict from both memory and disk consistently.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rust/src/core/mod.rs` around lines 4483 - 4493, The in-memory processed-event
cache (processed_group_event_ids) in note_processed_group_event_id grows
unbounded because persist_processed_mls_event_ids only truncates the serialized
file; change the in-memory structure to track recency (e.g. maintain a
HashSet<EventId> plus a VecDeque<EventId> or use an LRU structure) and enforce a
MAX_PROCESSED_IDS cap on insert: when inserting in
note_processed_group_event_id, if the ID already exists move it to the back of
the recency queue, otherwise push_back and insert into the set, and while the
queue length > MAX_PROCESSED_IDS pop_front and remove that ID from the set so
memory and disk eviction are consistent; after these bounded updates still call
persist_processed_mls_event_ids with self.data_dir and the trimmed set so both
in-memory and on-disk representations match.
There was a problem hiding this comment.
sure but I see this as follow-up hardening not a blocker for this parity fix.
There was a problem hiding this comment.
@reez Got it — agreed to treat this as follow-up hardening and not block this parity fix.
If you’d like, I can open a tracking issue with the following scope so it’s queued up:
- Problem: processed_group_event_ids (in-memory) is unbounded; disk pruning alone weakens duplicate filtering and increases write costs over long sessions.
- Proposed approach:
- Add a small recency structure (e.g., VecDeque) alongside the HashSet.
- Define a cap like MAX_PROCESSED_IDS (e.g., 20_000; tunable via config/env).
- On insert: if new, push_back into deque and insert into set; if existing, move it to the back; while len > cap, pop_front and remove from set.
- Persist only the bounded set, keeping memory and disk in sync.
- Acceptance criteria:
- Memory stays bounded under heavy message load.
- Duplicate suppression behavior remains correct across restarts.
- Unit tests cover eviction order and load-from-disk trimming.
Preference on the default cap? I can proceed with opening the issue now if you confirm.
✏️ Learnings added
Learnt from: reez
Repo: sledtools/pika PR: 513
File: rust/src/core/mod.rs:4484-4494
Timestamp: 2026-03-08T21:35:07.708Z
Learning: Repo: sledtools/pika PR: 513
File: rust/src/core/mod.rs
Learning: Bounding the in-memory processed_group_event_ids cache (e.g., via LRU/recency queue with a MAX_PROCESSED_IDS cap) is acknowledged as follow-up hardening and is not a blocker for the MLS backlog parity fix in this PR.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
🧠 Learnings used
Learnt from: reez
Repo: sledtools/pika PR: 513
File: rust/src/core/chat_media.rs:1832-1837
Timestamp: 2026-03-08T21:14:08.184Z
Learning: In `rust/src/core/chat_media.rs`, the `handle_group_send_catchup_completed` / `ingest_group_backlog_events` flow intentionally uses best-effort (log-and-continue) catch-up behavior to match CLI parity. Failing the send on backlog fetch error or incomplete ingestion (fail-closed policy) is explicitly deferred to a future follow-up PR. Do not flag the log-and-continue pattern as a bug in this context.
Learnt from: futurepaul
Repo: sledtools/pika PR: 429
File: rust/src/core/chat_media.rs:1257-1262
Timestamp: 2026-03-05T00:37:01.166Z
Learning: In `rust/src/core/chat_media.rs`, within `handle_batch_upload_completed`, the early returns when `self.session` is unavailable (around the `let Some(sess) = self.session.as_ref() else { return; }` check) and when `sess.local_keys` is unavailable do not transition the batch to a Failed state, which can strand the batch in pending/uploading. The fix — marking the batch as Failed and cleaning up outbox/delivery_overrides — is intentionally deferred to a follow-up PR alongside broader batch error-handling hardening (same class as the missing-URL/hash early returns in the same function).
Learnt from: futurepaul
Repo: sledtools/pika PR: 429
File: rust/src/core/chat_media.rs:874-895
Timestamp: 2026-03-05T00:37:03.771Z
Learning: In rust/src/core/chat_media.rs, within send_chat_media_batch, when the MDK re-encodes media and the local preview copy to final_local_path fails, the code should fall back to pp.local_path instead of setting att.local_path to None. This avoids temporarily losing the thumbnail in the outbox entry due to a cosmetic issue since uploads use encrypted_data in memory. Apply this fallback pattern in similar Rust code paths where a file copy may fail but an existing local_path could still be valid. This is a known follow-up and should be implemented as a defensive fallback.
There was a problem hiding this comment.
♻️ Duplicate comments (2)
rust/src/core/mod.rs (2)
774-775:⚠️ Potential issue | 🟠 MajorClear
processed_group_event_idswhenever MLS state is reset.This cache is loaded from
data_dir, but the logout path at Line 4747 deletes the MDK DB without clearing the dedupe set, andwipe_local_data()only removes the backing file on disk. In a same-process relogin/reprovision flow, a fresh MLS store can still skip backlog commits that it has never ingested. Please either scope this cache to the active account/store or clear both memory and disk anywhere local MLS state is reset.Also applies to: 891-925
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/src/core/mod.rs` around lines 774 - 775, The in-memory dedupe cache processed_group_event_ids must be cleared whenever local MLS state is reset or the backing DB is removed; update the reset/logout/reprovision flows to call a clear on processed_group_event_ids (and remove any persisted dedupe file if one exists) — e.g., modify the functions that wipe MLS state such as wipe_local_data() and the logout/reprovision handler to reset the HashSet processed_group_event_ids and ensure profile_db-related teardown also clears this cache so a new MLS store doesn’t skip commits it never ingested.
4483-4493:⚠️ Potential issue | 🟠 MajorBound the processed-event cache in memory too.
persist_processed_mls_event_ids()trims only the serialized output, and it does so after lexically sorting IDs.processed_group_event_idstherefore grows for the lifetime of the process, while the on-disk cap can evict arbitrary IDs instead of the most recent ones. Please track recency in memory and evict from both memory and disk on insert so the duplicate filter stays bounded and consistent.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/src/core/mod.rs` around lines 4483 - 4493, The in-memory set processed_group_event_ids in note_processed_group_event_id grows unbounded because persist_processed_mls_event_ids only trims the serialized on-disk output; change note_processed_group_event_id to maintain a bounded, recency-ordered cache (e.g. an LRU or a VecDeque of EventId + a HashSet lookup) so that on each insert you evict the oldest entries from processed_group_event_ids when the cache exceeds the same cap used by persist_processed_mls_event_ids, then call persist_processed_mls_event_ids (with &self.data_dir) after performing any evictions so disk and memory remain consistent and the duplicate filter stays bounded.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@rust/src/core/mod.rs`:
- Around line 774-775: The in-memory dedupe cache processed_group_event_ids must
be cleared whenever local MLS state is reset or the backing DB is removed;
update the reset/logout/reprovision flows to call a clear on
processed_group_event_ids (and remove any persisted dedupe file if one exists) —
e.g., modify the functions that wipe MLS state such as wipe_local_data() and the
logout/reprovision handler to reset the HashSet processed_group_event_ids and
ensure profile_db-related teardown also clears this cache so a new MLS store
doesn’t skip commits it never ingested.
- Around line 4483-4493: The in-memory set processed_group_event_ids in
note_processed_group_event_id grows unbounded because
persist_processed_mls_event_ids only trims the serialized on-disk output; change
note_processed_group_event_id to maintain a bounded, recency-ordered cache (e.g.
an LRU or a VecDeque of EventId + a HashSet lookup) so that on each insert you
evict the oldest entries from processed_group_event_ids when the cache exceeds
the same cap used by persist_processed_mls_event_ids, then call
persist_processed_mls_event_ids (with &self.data_dir) after performing any
evictions so disk and memory remain consistent and the duplicate filter stays
bounded.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 3832f74b-9dbf-43a1-a36a-30d5e9370451
📒 Files selected for processing (3)
rust/src/core/chat_media.rsrust/src/core/mod.rsrust/src/updates.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- rust/src/updates.rs
There was a problem hiding this comment.
♻️ Duplicate comments (1)
rust/src/core/mod.rs (1)
4484-4494:⚠️ Potential issue | 🟠 MajorBound the processed-event cache in memory too.
processed_group_event_idsstill grows for the life of the process, whilepersist_processed_mls_event_ids()only trims the serialized copy after lexicographically sorting ids. On a long-lived session, every new event makes this synchronous sort/write more expensive, and memory keeps arbitrarily old ids anyway. Please enforce the cap on insert with recency tracking so memory and disk evict the same entries.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/src/core/mod.rs` around lines 4484 - 4494, The in-memory processed_group_event_ids is unbounded; change note_processed_group_event_id to enforce a cap by adding recency tracking (e.g., a VecDeque or similar order structure) and a constant MAX_PROCESSED_GROUP_EVENT_IDS, so when inserting a new EventId into processed_group_event_ids (in function note_processed_group_event_id) you push the id onto the recency queue and if the queue length exceeds MAX_PROCESSED_GROUP_EVENT_IDS pop the oldest id and remove it from the processed_group_event_ids HashSet; keep the existing persist_processed_mls_event_ids call (using self.data_dir and &self.processed_group_event_ids) so disk and memory are evicted consistently.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@rust/src/core/mod.rs`:
- Around line 4484-4494: The in-memory processed_group_event_ids is unbounded;
change note_processed_group_event_id to enforce a cap by adding recency tracking
(e.g., a VecDeque or similar order structure) and a constant
MAX_PROCESSED_GROUP_EVENT_IDS, so when inserting a new EventId into
processed_group_event_ids (in function note_processed_group_event_id) you push
the id onto the recency queue and if the queue length exceeds
MAX_PROCESSED_GROUP_EVENT_IDS pop the oldest id and remove it from the
processed_group_event_ids HashSet; keep the existing
persist_processed_mls_event_ids call (using self.data_dir and
&self.processed_group_event_ids) so disk and memory are evicted consistently.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: f9139b80-d3a9-4dc0-895d-7bc4e13835ef
📒 Files selected for processing (3)
rust/src/core/chat_media.rsrust/src/core/mod.rsrust/src/updates.rs
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rust/src/core/mod.rs (1)
4465-4489:⚠️ Potential issue | 🟠 MajorDon’t persist
Unprocessableevents as already handled.
note_processed_group_event_id(event_id)runs for every successfulprocess_message()call, includingMessageProcessingResult::Unprocessable { .. }. Those are exactly the events that can become ingestible on a later pass once earlier commits land, so caching them here can make backlog catch-up skip them permanently and leave the app stuck on the wrong MLS epoch. At minimum, gate the cache write soUnprocessableresults are not recorded yet.Suggested fix
pub(crate) fn handle_group_message(&mut self, event: Event) { let event_id = event.id; let result = { let Some(sess) = self.session.as_mut() else { tracing::warn!("group_message but no session"); @@ } }; - self.note_processed_group_event_id(event_id); + if !matches!(&result, MessageProcessingResult::Unprocessable { .. }) { + self.note_processed_group_event_id(event_id); + } self.handle_message_processing_result(result); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/src/core/mod.rs` around lines 4465 - 4489, The code currently calls note_processed_group_event_id(event_id) for every successful sess.mdk.process_message(&event) result, which causes MessageProcessingResult::Unprocessable events to be recorded in processed_group_event_ids and persisted; instead, change the flow so you only call note_processed_group_event_id (and thus persist_processed_group_event_ids_cache) for results that are actually handled/progressing the state—e.g., inspect the result returned by process_message (the value passed into handle_message_processing_result) and skip calling note_processed_group_event_id when it matches MessageProcessingResult::Unprocessable { .. } while still passing the result into handle_message_processing_result.
🧹 Nitpick comments (1)
rust/src/core/chat_media.rs (1)
1879-1881: Consider persisting processed IDs before early return.If the session becomes
Nonemid-ingestion, any events already processed in this pass (marked vianote_processed_group_event_id_in_memory) won't be persisted since line 1905 is never reached. On restart, those events would be reprocessed.Given the best-effort semantics and edge-case nature (logout during catch-up), this is acceptable, but persisting before the early return would improve robustness.
Proposed fix
let result = { let Some(sess) = self.session.as_mut() else { + if processed_any { + self.persist_processed_group_event_ids_cache(); + } return; }; sess.mdk.process_message(&event) };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/src/core/chat_media.rs` around lines 1879 - 1881, Session check returns early when self.session is None, which prevents already-marked IDs (via note_processed_group_event_id_in_memory) from being flushed to durable storage; before returning from that early-exit in the ingestion function, persist the in-memory processed IDs. Concretely, in the block that checks self.session (the Some(sess) = self.session.as_mut() else { return; } branch), call the existing persistence flush/commit routine used for processed IDs (the same code path that runs later at line ~1905 after processing) so note_processed_group_event_id_in_memory entries are written out before returning — reference self.session and note_processed_group_event_id_in_memory to locate where to insert the flush.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@rust/src/core/mod.rs`:
- Around line 4465-4489: The code currently calls
note_processed_group_event_id(event_id) for every successful
sess.mdk.process_message(&event) result, which causes
MessageProcessingResult::Unprocessable events to be recorded in
processed_group_event_ids and persisted; instead, change the flow so you only
call note_processed_group_event_id (and thus
persist_processed_group_event_ids_cache) for results that are actually
handled/progressing the state—e.g., inspect the result returned by
process_message (the value passed into handle_message_processing_result) and
skip calling note_processed_group_event_id when it matches
MessageProcessingResult::Unprocessable { .. } while still passing the result
into handle_message_processing_result.
---
Nitpick comments:
In `@rust/src/core/chat_media.rs`:
- Around line 1879-1881: Session check returns early when self.session is None,
which prevents already-marked IDs (via note_processed_group_event_id_in_memory)
from being flushed to durable storage; before returning from that early-exit in
the ingestion function, persist the in-memory processed IDs. Concretely, in the
block that checks self.session (the Some(sess) = self.session.as_mut() else {
return; } branch), call the existing persistence flush/commit routine used for
processed IDs (the same code path that runs later at line ~1905 after
processing) so note_processed_group_event_id_in_memory entries are written out
before returning — reference self.session and
note_processed_group_event_id_in_memory to locate where to insert the flush.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 8c25d9c7-a997-4f65-92e6-07dedbb88136
📒 Files selected for processing (3)
rust/src/core/chat_media.rsrust/src/core/mod.rsrust/src/updates.rs
| let tx = self.core_sender.clone(); | ||
| let mut seen_ids = self.processed_group_event_ids.clone(); | ||
| let chat_id_for_task = chat_id.clone(); | ||
| self.runtime.spawn(async move { | ||
| let filter = Filter::new() | ||
| .kind(Kind::MlsGroupMessage) | ||
| .custom_tags( | ||
| SingleLetterTag::lowercase(Alphabet::H), | ||
| vec![chat_id_for_task.clone()], | ||
| ) | ||
| .limit(200); | ||
| let (mut backlog_events, error) = match client | ||
| .fetch_events_from(relays.clone(), filter, std::time::Duration::from_secs(8)) | ||
| .await | ||
| { | ||
| Ok(evts) => (evts.into_iter().collect::<Vec<Event>>(), None), | ||
| Err(e) => (Vec::new(), Some(format!("group backlog fetch failed: {e}"))), | ||
| }; | ||
| backlog_events.retain(|ev| seen_ids.insert(ev.id)); | ||
| backlog_events.sort_by_key(|ev| ev.created_at.as_secs()); | ||
|
|
||
| let _ = tx.send(CoreMsg::Internal(Box::new( | ||
| InternalEvent::GroupSendCatchupCompleted { | ||
| chat_id: chat_id_for_task, | ||
| rumor, | ||
| backlog_events, | ||
| error, | ||
| }, | ||
| ))); | ||
| }); |
There was a problem hiding this comment.
🔴 Crash safety regression: encrypted wrapper not persisted until after 8-second async backlog fetch
In the old code, the rumor was encrypted via create_message and immediately persisted to pending_sends (SQLite-backed) before any async work began. On app crash, PendingSends::load at rust/src/core/mod.rs:881 would recover the wrapper for retry.
In the new code, publish_chat_message_with_tags spawns an async backlog fetch (up to 8 seconds, rust/src/core/chat_media.rs:1833) before encryption happens. The pending_sends.insert call has moved to publish_prepared_group_rumor at rust/src/core/chat_media.rs:1953, which only runs after the async catchup completes. If the app crashes during this window, the message is lost: local_outbox and delivery_overrides are in-memory only (lost on crash), and the wrapper was never persisted to pending_sends. The user's message silently vanishes with no way to recover or retry it.
Comparison of persistence timing
Old code persisted the encrypted wrapper synchronously before the async send:
create_message → pending_sends.insert (persisted) → spawn send
New code defers persistence past an 8-second async gap:
spawn backlog fetch (8s timeout) → [async gap] → create_message → pending_sends.insert
The crash-safety window expanded from near-zero to 8+ seconds.
Prompt for agents
In rust/src/core/chat_media.rs, the publish_chat_message_with_tags function (around line 1821-1850) now defers encryption and pending_sends persistence until after an 8-second async backlog fetch, creating a window where a crash would lose the user's message. To fix this, consider one of these approaches:
1. Persist the unencrypted rumor content to a new durable store (e.g. a "pending_outbox" SQLite table) before spawning the backlog fetch at line 1824. Then in publish_prepared_group_rumor (line 1909), remove it from the pending_outbox after successful pending_sends.insert. On app restart, re-create and re-encrypt any pending_outbox entries.
2. Alternatively, encrypt the message BEFORE the backlog fetch (as in the old code) and store the wrapper in pending_sends immediately. Then perform the backlog fetch as a fire-and-forget side-effect. This preserves crash safety but may mean the encrypted message uses a stale MLS epoch (which was presumably the reason for the refactor).
3. At minimum, document in a code comment that the backlog catchup window is intentionally not crash-safe, if this is an accepted tradeoff.
Was this helpful? React with 👍 or 👎 to provide feedback.
| fn ingest_group_backlog_events(&mut self, backlog_events: Vec<Event>) { | ||
| let mut remaining = backlog_events; | ||
| let mut processed_any = false; | ||
| for _ in 0..3 { | ||
| if remaining.is_empty() { | ||
| break; | ||
| } | ||
| let current = std::mem::take(&mut remaining); | ||
| let mut next = Vec::new(); | ||
| for event in current.into_iter() { | ||
| let event_id = event.id; | ||
| let result = { | ||
| let Some(sess) = self.session.as_mut() else { | ||
| return; | ||
| }; | ||
| sess.mdk.process_message(&event) | ||
| }; | ||
| match result { | ||
| Ok(r) => { | ||
| self.note_processed_group_event_id_in_memory(event_id); | ||
| processed_any = true; | ||
| self.handle_message_processing_result(r); | ||
| } | ||
| Err(e) => { | ||
| tracing::debug!(event_id = %event.id.to_hex(), %e, "deferred backlog message"); | ||
| next.push(event); | ||
| } | ||
| } | ||
| } | ||
| if next.is_empty() { | ||
| break; | ||
| } | ||
| remaining = next; | ||
| } | ||
| if !remaining.is_empty() { | ||
| tracing::warn!(remaining = remaining.len(), "failed to ingest some backlog events"); | ||
| } | ||
| if processed_any { | ||
| self.persist_processed_group_event_ids_cache(); | ||
| } | ||
| } |
There was a problem hiding this comment.
🟡 No tests added for new backlog catchup logic
The AGENTS.md rule states: "Always add tests for changes when possible." This PR introduces three new non-trivial functions — handle_group_send_catchup_completed (line 1853), ingest_group_backlog_events (line 1867), and publish_prepared_group_rumor (line 1909) — along with four new helper methods for processed_group_event_ids management (rust/src/core/mod.rs:4484-4520). None of these have tests. At minimum, ingest_group_backlog_events (the multi-pass retry loop) and the note_processed_group_event_id/clear_processed_group_event_ids_cache helpers are unit-testable without complex mocking.
Was this helpful? React with 👍 or 👎 to provide feedback.
Makes app group sends catch up recent MLS backlog before
create_message().The CLI already does this, the app send path did not. In fresh post-join flows that could leave the app on a stale MLS epoch and cause receiver-side decrypt failures on the first app-sent group message.
This patch brings app send behavior in line with CLI by:
Summary by CodeRabbit
New Features
Improvements
Reliability