Conversation
📝 WalkthroughWalkthroughThis PR introduces a Changes
Sequence DiagramsequenceDiagram
participant Node
participant PIS as ParallelInfoService
participant DBS as DagBlockSender
participant API as SyncAPI
Node->>PIS: Initialize & Register Service
activate PIS
DBS->>PIS: register_worker(worker_id)
note over PIS: Track: worker_id → synced_blocks=0
DBS->>DBS: Execute block via worker
DBS->>PIS: report_worker_synced_block(worker_id)
note over PIS: Increment synced_blocks for worker<br/>Generate snapshot with totals
API->>PIS: get_parallel_sync_stat()
activate PIS
PIS-->>API: ParallelSyncStat {workers, total_count}
deactivate PIS
API-->>Node: SyncProgressReport {parallel: Some(...)}
DBS->>PIS: unregister_worker(worker_id)
note over PIS: Cleanup worker entry
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~30 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: f4d3c73f3e
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| let synced_block_count = self.worker_synced_blocks.entry(msg.worker_id).or_insert(0); | ||
| *synced_block_count = synced_block_count.saturating_add(1); |
There was a problem hiding this comment.
Reject sync reports from unknown worker IDs
ReportWorkerSyncedBlockRequest currently creates a new entry when the worker ID is missing, which makes the stats vulnerable to out-of-order actor messages: worker IDs are reused from 0 in each DagBlockSender instance (sync/src/parallel/sender.rs, next_worker_id: 0), and senders are recreated repeatedly during sync (sync/src/tasks/block_sync_task.rs, DagBlockSender::new(...)), so a delayed report from an old sender can resurrect or inflate a worker in a new cycle and corrupt parallel progress output.
Useful? React with 👍 / 👎.
| Self::report_worker_synced_block( | ||
| &self.parallel_info_service, | ||
| worker.worker_id, | ||
| ); | ||
| self.notifier.notify(*executed_block)?; |
There was a problem hiding this comment.
Count blocks only after notify succeeds
In wait_for_finish, the worker stat is incremented before self.notifier.notify(...) is executed; if notify returns an error (for example storage/connect failure), sync aborts but the block is already counted as synced, so the reported worker totals become inaccurate for failure paths. This differs from flush_executor_state, which reports only after successful notify.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
🧹 Nitpick comments (2)
sync/api/src/lib.rs (1)
137-141: Changeworker_countto a fixed-width integer type for stable wire-level behavior.
worker_count: usizeis platform-width dependent in this serialized public API type. Useu64for consistent schema and serialization across 32-bit and 64-bit builds.Update the assignment in
sync/src/parallel/parallel_info_service.rs:33to castworkers.len()appropriately (e.g.,worker_count: workers.len() as u64).Suggested change
#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema, PartialEq, Eq)] pub struct ParallelSyncStat { - pub worker_count: usize, + pub worker_count: u64, pub total_synced_block_count: u64, pub workers: Vec<ParallelWorkerSyncStat>, }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@sync/api/src/lib.rs` around lines 137 - 141, Change the field type of ParallelSyncStat::worker_count from usize to u64 to ensure stable wire-level serialization; update the struct declaration for ParallelSyncStat (which contains worker_count, total_synced_block_count, and workers: Vec<ParallelWorkerSyncStat>) and then update the place where the struct is constructed in the parallel info service (the code that sets worker_count from workers.len()) to cast the length to u64 (e.g., worker_count: workers.len() as u64).sync/src/parallel/sender.rs (1)
355-379: Consider extracting shared cleanup logic.The
abort_workersmethod andDropimplementation have identical cleanup logic. While the duplication is safe (due to theregisteredguard inmark_worker_closed), consider extracting to a shared helper method for maintainability.♻️ Suggested refactor to reduce duplication
+ fn cleanup_worker( + parallel_info_service: &Option<ServiceRef<ParallelInfoService>>, + free_worker_ids: &mut BinaryHeap<Reverse<ParallelWorkerId>>, + worker: &mut DagBlockWorker, + ) { + let _ = worker.sender_to_executor.try_send(None); + worker.handle.abort(); + Self::mark_worker_closed(parallel_info_service, free_worker_ids, worker); + } + fn abort_workers(&mut self) { for worker in &mut self.executors { - let _ = worker.sender_to_executor.try_send(None); - worker.handle.abort(); - Self::mark_worker_closed( + Self::cleanup_worker( &self.parallel_info_service, &mut self.free_worker_ids, worker, ); } } } impl Drop for DagBlockSender<'_> { fn drop(&mut self) { for worker in &mut self.executors { - let _ = worker.sender_to_executor.try_send(None); - worker.handle.abort(); - Self::mark_worker_closed( + Self::cleanup_worker( &self.parallel_info_service, &mut self.free_worker_ids, worker, ); } } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@sync/src/parallel/sender.rs` around lines 355 - 379, The cleanup loop in abort_workers and in the Drop impl for DagBlockSender is duplicated; extract that loop into a private helper method (e.g., fn cleanup_executors(&mut self)) that iterates self.executors and performs the same steps (sender_to_executor.try_send(None), worker.handle.abort(), and calling Self::mark_worker_closed(&self.parallel_info_service, &mut self.free_worker_ids, worker)), and then call this helper from both abort_workers and Drop to remove duplication while preserving behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@sync/api/src/lib.rs`:
- Around line 137-141: Change the field type of ParallelSyncStat::worker_count
from usize to u64 to ensure stable wire-level serialization; update the struct
declaration for ParallelSyncStat (which contains worker_count,
total_synced_block_count, and workers: Vec<ParallelWorkerSyncStat>) and then
update the place where the struct is constructed in the parallel info service
(the code that sets worker_count from workers.len()) to cast the length to u64
(e.g., worker_count: workers.len() as u64).
In `@sync/src/parallel/sender.rs`:
- Around line 355-379: The cleanup loop in abort_workers and in the Drop impl
for DagBlockSender is duplicated; extract that loop into a private helper method
(e.g., fn cleanup_executors(&mut self)) that iterates self.executors and
performs the same steps (sender_to_executor.try_send(None),
worker.handle.abort(), and calling
Self::mark_worker_closed(&self.parallel_info_service, &mut self.free_worker_ids,
worker)), and then call this helper from both abort_workers and Drop to remove
duplication while preserving behavior.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
node/src/node.rssync/api/src/lib.rssync/src/parallel/mod.rssync/src/parallel/parallel_info_service.rssync/src/parallel/sender.rssync/src/parallel/tests.rssync/src/sync.rssync/src/tasks/block_sync_task.rssync/src/tasks/inner_sync_task.rssync/src/tasks/mod.rs
Pull request type
Please check the type of change your PR introduces:
What is the current behavior?
Issue Number: N/A
What is the new behavior?
Other information
Summary by CodeRabbit
New Features