receiver: wait for batched FDs instead of returning MismatchedCount#9
Conversation
There was a problem hiding this comment.
Code Review
This pull request enhances the UnixSocketTransport to handle batched file descriptors by introducing a pending_message buffer in the Receiver. This prevents MismatchedCount errors when JSON messages are parsed before all associated FDs have arrived. The review feedback suggests optimizing the retrieval of pending messages using Option::take to avoid expensive clones and recommends refactoring duplicated message construction logic into a private helper method.
src/transport.rs
Outdated
| if let Some((ref value, fd_count)) = self.pending_message { | ||
| if self.fd_queue.len() >= fd_count { | ||
| let value = value.clone(); | ||
| self.pending_message = None; | ||
|
|
||
| let fds: Vec<OwnedFd> = (0..fd_count) | ||
| .map(|_| self.fd_queue.pop_front().unwrap()) | ||
| .collect(); | ||
|
|
||
| let message = JsonRpcMessage::from_json_value(value)?; | ||
| return Ok(Some(MessageWithFds::new(message, fds))); | ||
| } | ||
| // Still waiting for more FDs | ||
| return Ok(None); | ||
| } |
There was a problem hiding this comment.
To improve efficiency, you can avoid cloning the serde_json::Value here. By using self.pending_message.take(), you can move the value out of the Option. If you find that there aren't enough file descriptors yet, you can move it back. This avoids a potentially expensive allocation and deep copy of the JSON value.
if let Some((value, fd_count)) = self.pending_message.take() {
if self.fd_queue.len() >= fd_count {
let fds: Vec<OwnedFd> = (0..fd_count)
.map(|_| self.fd_queue.pop_front().unwrap())
.collect();
let message = JsonRpcMessage::from_json_value(value)?;
return Ok(Some(MessageWithFds::new(message, fds)));
}
// Not enough FDs, put it back and wait for more.
self.pending_message = Some((value, fd_count));
return Ok(None);
}
src/transport.rs
Outdated
| let fds: Vec<OwnedFd> = (0..fd_count) | ||
| .map(|_| self.fd_queue.pop_front().unwrap()) | ||
| .collect(); | ||
|
|
||
| let message = JsonRpcMessage::from_json_value(value)?; | ||
| return Ok(Some(MessageWithFds::new(message, fds))); |
There was a problem hiding this comment.
8aa2697 to
5e56841
Compare
Looking at bootc-dev#9 basically the spec had an internal contradiction one part mandating returning an error with fewer than expected FDs, but a whole other section explaining how to handle more FDs than fit in a single sendmsg clal. Fix this by clarifying and cross referencing how to handle the latter case. Assisted-by: OpenCode (Claude Opus 4) Signed-off-by: Colin Walters <walters@verbum.org>
|
OK yes sorry, this was buggy...I did a PR for the spec here #10 |
5e56841 to
33b9744
Compare
cgwalters
left a comment
There was a problem hiding this comment.
Assisted-by: OpenCode (Claude Opus 4)
AI-generated review. Comments prefixed with AI: are unedited.
| buffer: Vec<u8>, | ||
| fd_queue: VecDeque<OwnedFd>, | ||
| /// A fully parsed JSON message waiting for its FDs to arrive. | ||
| pending_message: Option<(serde_json::Value, usize)>, |
There was a problem hiding this comment.
AI: Important: When the connection closes while pending_message is Some, read_more_data() returns Error::ConnectionClosed and the pending message is silently abandoned. The caller gets ConnectionClosed instead of MismatchedCount, losing diagnostic info about expected vs. found FD counts.
Worse, receive_opt() converts ConnectionClosed into Ok(None), making a protocol error look like a clean shutdown.
Suggest intercepting this in receive():
pub async fn receive(&mut self) -> Result<MessageWithFds> {
loop {
if let Some(message) = self.try_parse_message()? {
return Ok(message);
}
match self.read_more_data().await {
Err(Error::ConnectionClosed) if self.pending_message.is_some() => {
let (_, fd_count) = self.pending_message.take().unwrap();
return Err(Error::MismatchedCount {
expected: fd_count,
found: self.fd_queue.len(),
});
}
Err(e) => return Err(e),
Ok(()) => {}
}
}
}This preserves the "wait for batched FDs" behavior while restoring the diagnostic when the connection actually dies. It also means receive_opt() correctly propagates MismatchedCount as an error rather than swallowing it as Ok(None).
The existing tests test_mismatched_fd_count_error and test_fds_field_mismatch_too_few_fds should also be tightened to assert on the specific error variant.
src/transport.rs
Outdated
|
|
||
| fn try_parse_message(&mut self) -> Result<Option<MessageWithFds>> { | ||
| // Check if we have a pending message waiting for FDs. | ||
| if let Some((value, fd_count)) = self.pending_message.take() { |
There was a problem hiding this comment.
AI: (low) Worth noting that while a message is pending, all subsequent message parsing is blocked — even messages needing 0 FDs. This is correct for maintaining FIFO ordering on a Unix socket, but a brief comment documenting this serialization property would help future readers.
tests/integration_tests.rs
Outdated
| let listener = tokio::net::UnixListener::bind(&socket_path).unwrap(); | ||
|
|
||
| let server_handle = tokio::spawn(async move { | ||
| if let Ok((stream, _)) = listener.accept().await { |
There was a problem hiding this comment.
AI: (low) Per REVIEW.md: "Avoid the if let Ok(v) = ... { } pattern by default." If accept() fails, the test body is silently skipped and the test passes vacuously. Consider .unwrap() instead. (Pre-existing pattern in other tests, but worth not propagating further.)
src/transport.rs
Outdated
| value: serde_json::Value, | ||
| fd_count: usize, | ||
| ) -> Result<MessageWithFds> { | ||
| let fds: Vec<OwnedFd> = (0..fd_count) |
There was a problem hiding this comment.
I think this would be more elegant with fd_queue.drain()
src/transport.rs
Outdated
| fn try_parse_message(&mut self) -> Result<Option<MessageWithFds>> { | ||
| // Check if we have a pending message waiting for FDs. | ||
| if let Some((value, fd_count)) = self.pending_message.take() { | ||
| if self.fd_queue.len() >= fd_count { |
There was a problem hiding this comment.
i think with Rust 2024 edition we could fold this conditional into the above and avoid a take -> put cycle
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
33b9744 to
fdb2e54
Compare
src/transport.rs
Outdated
| Err(Error::ConnectionClosed) if self.pending_message.is_some() => { | ||
| // Connection closed while waiting for FDs — per spec | ||
| // Section 5, Step 4 this is a Mismatched Count error. | ||
| let (_, fd_count) = self.pending_message.take().unwrap(); |
There was a problem hiding this comment.
I think we can also avoid an is_some() + unwrap() here by adding to the match above
src/transport.rs
Outdated
| found: self.fd_queue.len(), | ||
| }); | ||
| } | ||
| debug!( |
There was a problem hiding this comment.
Minor nit i'd downgrade this to trace
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
fdb2e54 to
1b39027
Compare
Looking at #9 basically the spec had an internal contradiction one part mandating returning an error with fewer than expected FDs, but a whole other section explaining how to handle more FDs than fit in a single sendmsg clal. Fix this by clarifying and cross referencing how to handle the latter case. Assisted-by: OpenCode (Claude Opus 4) Signed-off-by: Colin Walters <walters@verbum.org>
When the sender batches file descriptors across multiple sendmsg() calls, the receiver may parse the complete JSON message before all FDs have arrived. Previously this returned a MismatchedCount error immediately.
Buffer the parsed message in a pending_message field and let the receive() loop continue calling read_more_data() until enough FDs have been collected.
Assisted-by: Claude Opus 4.6 noreply@anthropic.com
@cgwalters does this make any sense? The issue was found by Claude and it is required to use the crate in composefs/composefs-rs#228