Conversation
443077b to
2ccf4f7
Compare
|
📖 Documentation Preview: https://reflectapi-docs-preview-pr-107.partly.workers.dev Updated automatically from commit 903853d |
eed59c3 to
16ba39e
Compare
wip multiple callback types more careful with privacy to avoid semver hazards Refactor handler building more wipping refactor input parsing grind almost there building maybe works clippy fix on new rust version wip example demo with pets cdc this works support returning an top level error in stream handler wip schema wip schema 2
|
|
||
| #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] | ||
| #[serde(tag = "output_kind", rename_all = "snake_case")] | ||
| pub enum OutputType { |
There was a problem hiding this comment.
do we really need this? seems like handling partial case of what TypeReference can already do anyway. TypeReference to Stream with ItemType and ItemErrorType would have exactly this information anyway.
There was a problem hiding this comment.
That seems like a harder and worse way of implementing this. It should be a first class concept I think rather than some magic type.
| H: crate::Input + serde::de::DeserializeOwned + Send + 'static, | ||
| O: crate::Output + serde::ser::Serialize + Send + 'static, | ||
| E: crate::Output + serde::ser::Serialize + crate::StatusCode + Send + 'static, | ||
| I: Input + DeserializeOwned + Send + 'static, |
There was a problem hiding this comment.
I prefer full paths as it is clear what is what in the reviews
There was a problem hiding this comment.
There is no ambiguity here though right? Input/Output are/StatusCode our own traits, Serialize/Deserialize are effectively known across the entire ecosystem. It significantly reduces line noise to not have it qualified.
| /// | ||
| /// This method takes a stream handler function and a closure that configures the | ||
| /// route's metadata (like its name, path, and description) using a [`RouteBuilder`]. | ||
| pub fn stream_route<F, St, R, I, O, E1, E2, H>( |
There was a problem hiding this comment.
where are 2 errors coming from? I think we can assume that notifications item is always a success type without an error part.
There was a problem hiding this comment.
That's too strict though, all IO is fallible, how to handle this nicely without the inner error type? It's optional anyway so it's not forced similar to how existing handlers work. () or Ok(()) are valid return types due to IntoResult
| impl<Item> Unpin for Unfold<Item> {} | ||
| } | ||
|
|
||
| pub struct Map<Item> { |
There was a problem hiding this comment.
could you please remind me why need all of this?
There was a problem hiding this comment.
can test the generated rust code typechecks. Need type definitions of dependencies to typecheck against.
| ): Promise<Result<AsyncIterable<O>, Err<E>>> { | ||
| let hdrs: Record<string, string> = { | ||
| "content-type": "application/json", | ||
| "accept": "text/event-stream", |
There was a problem hiding this comment.
that means we also serialize notifications as json? I thought reflectapi is agnostic to encoding between msgpack vs json. Previously the rule was: the server responds to the same encoding / packing as the request was. I suggest we keep following this rule.
There was a problem hiding this comment.
I don't think any of the clients actually support messagepack right now. That can be future work I think
There was a problem hiding this comment.
Also sse requires utf8 so we'd have to base64 or something ontop of messagepack
| } | ||
| } | ||
|
|
||
| async function* __sse_to_async_iterable<O>( |
| try { | ||
| while (true) { | ||
| if (options?.signal?.aborted) break; | ||
| const { done, value } = await reader.read(); |
There was a problem hiding this comment.
does this give complete message separated by 2 new lines? or it can cut it in the middle as pure http/tcp stream?
| if (options?.signal?.aborted) break; | ||
| const { done, value } = await reader.read(); | ||
| if (done) break; | ||
| yield JSON.parse(value.data) as O; |
There was a problem hiding this comment.
as far as I can see the protocol allows event type prefix (event: ). I suggest we ignore it, and hardcode on the server side to just notification
| const { done, value } = await reader.read(); | ||
| if (done) break; | ||
| const parsed = JSON.parse(value.data); | ||
| if ("__reflectapi_reserved_stream_error" in parsed) { |
There was a problem hiding this comment.
how can that happen? I think only network error types are possible here, and we assume the server does not send errors in the event stream. Only the original response error needs to be handled.
There was a problem hiding this comment.
Well I implemented it such that it supports per event errors, and I still think that's a useful thing to support.
| output_type: Some(output_type), | ||
| } => type_ref_to_python_type_simple(output_type, schema, implemented_types, &[])?, | ||
| OutputType::Complete { output_type: None } => "Any".to_string(), | ||
| OutputType::Stream { .. } => unreachable!("stream endpoints should be filtered out"), |
There was a problem hiding this comment.
do you mean python client will not support these?
There was a problem hiding this comment.
Will leave to @hardbyte if it's necessary
| match body.next().await { | ||
| Some(Ok(chunk)) => { | ||
| buffer.push_str(&String::from_utf8_lossy(&chunk)); | ||
| } | ||
| Some(Err(_)) => { | ||
| return Some((Err("stream error".to_string()), (body, buffer, data))); |
There was a problem hiding this comment.
🔴 At rt.rs:267, String::from_utf8_lossy(&chunk) permanently corrupts multi-byte UTF-8 sequences (emoji, CJK characters, accented letters) that are split across HTTP chunk boundaries by replacing partial bytes with U+FFFD. Since serde_json::to_string on the server side outputs raw UTF-8 bytes (not \uXXXX escapes), any stream item with non-ASCII content is vulnerable to silent data corruption or JSON parse failures on the client.
Extended reasoning...
What the bug is and how it manifests
At reflectapi/src/rt.rs line 267, the SSE stream parser converts each raw HTTP body chunk to a String using String::from_utf8_lossy(&chunk). The from_utf8_lossy function replaces any invalid or incomplete UTF-8 byte sequences with the Unicode replacement character U+FFFD (0xEF 0xBF 0xBD). Since HTTP/1.1 chunked transfer encoding can split the body at any byte boundary, a multi-byte UTF-8 sequence (2–4 bytes) can easily be split across two successive chunks.
The specific code path that triggers it
The server serializes each SSE stream item in handler.rs at line 483:
Ok(v) => serde_json::to_string(&v).map_err(|err| err.to_string()),This outputs raw UTF-8 bytes — for example, the string "café" is serialized as the bytes 22 63 61 66 C3 A9 22. When this 7-byte payload is split across two chunks (e.g., first chunk ends at the C3 byte, second starts at A9), from_utf8_lossy replaces the lone 0xC3 with U+FFFD and pushes that into the buffer String. When the second chunk arrives with 0xA9, that single byte is also invalid UTF-8 and gets replaced with another U+FFFD. The replacement characters are now permanently stored in buffer with no mechanism to remove them.
Why existing code doesn't prevent it
The buffer accumulates decoded String fragments across chunk iterations. Once from_utf8_lossy substitutes U+FFFD into the buffer, there is no way to recover the original bytes — the raw bytes are gone. The fix would require maintaining a Vec<u8> byte accumulator and using std::str::from_utf8 (which returns an error rather than silently substituting) to detect incomplete sequences, holding the trailing incomplete bytes to be prepended to the next chunk.
Addressing the refutation
The sole refutation claims that serde_json::to_string() escapes all non-ASCII as \uXXXX, making the output pure 7-bit ASCII. This is factually incorrect. The default serde_json serializer outputs raw UTF-8 bytes for string values; it does NOT escape non-ASCII characters to \uXXXX unless you use a custom serializer with that behavior. This is well-established serde_json behavior and is confirmed by all three verifiers who independently checked. Running serde_json::to_string("hello, 世界")" in Rust produces "hello, 世界"` with raw 3-byte UTF-8 sequences, not escaped codepoints.
Impact
Any API response containing non-ASCII data (pet names with accents, internationalized error messages, arbitrary user content) would be silently corrupted. The client would receive a JSON string where multi-byte characters near chunk boundaries are replaced with the U+FFFD replacement character, either causing JSON parse failures (if the corruption breaks the JSON structure) or returning semantically wrong data (corrupted string values) without any error.
Step-by-step proof
- Server has a Pet with name
"Résumé"(8 bytes in UTF-8: 52 C3 A9 73 75 6D C3 A9). serde_json::to_string(&pet)produces{"name":"Résumé",...}as raw UTF-8 bytes.- HTTP transport splits the body: chunk 1 =
..."R\xC3(ends mid-sequence), chunk 2 =\xA9sum\xC3\xA9".... - Processing chunk 1:
String::from_utf8_lossysees an incomplete leading byte 0xC3 at the end — replaces it with U+FFFD. Buffer now contains..."R\u{FFFD}. - Processing chunk 2: bytes 0xA9 is a continuation byte with no leading byte — also replaced with U+FFFD. Buffer now contains
..."R\u{FFFD}\u{FFFD}sum\xC3\xA9"...(well, the last \xC3\xA9 may survive if in the same chunk). - The resulting JSON has
"R\uFFFD\uFFFDsum\xC3\xA9"— the name is permanently corrupted. Client deserializes the wrong value.
Fix: Replace String::from_utf8_lossy(&chunk) with a Vec<u8> accumulator and process UTF-8 boundaries correctly using std::str::from_utf8 incrementally.
| match body.next().await { | ||
| Some(Ok(chunk)) => { | ||
| buffer.push_str(&String::from_utf8_lossy(&chunk)); | ||
| } | ||
| Some(Err(_)) => { | ||
| return Some((Err("stream error".to_string()), (body, buffer, data))); | ||
| } | ||
| None => { | ||
| if !data.is_empty() { | ||
| if data.ends_with('\n') { | ||
| data.pop(); |
There was a problem hiding this comment.
🔴 In __parse_sse_stream, transport errors from the underlying byte stream are silently discarded with Some(Err(_)) and replaced with the hardcoded string "stream error". Because the function error type is only bounded by impl Send + 'static' (no Display or Debug), the actual error value cannot be formatted—any network disconnect, timeout, or HTTP error will surface to callers as the opaque string "stream error" with zero diagnostic context, making production debugging of SSE streaming failures essentially impossible.
Extended reasoning...
What the bug is and how it manifests
In reflectapi/src/rt.rs, the __parse_sse_stream function drives SSE parsing via futures_util::stream::unfold. When the underlying byte stream yields an error (e.g., a network disconnect, timeout, connection reset, or HTTP/2 stream error), the closure hits this arm:
Some(Err(_)) => {
return Some((Err("stream error".to_string()), (body, buffer, data)));
}The actual error value is discarded using the _ wildcard, and the fixed string "stream error" is returned in its place.
The specific code path that triggers it
Callers __stream_request_impl and __stream_request_fallible_impl both call __parse_sse_stream and then map stream errors into Error::Protocol { info: e, stage: ProtocolErrorStage::DeserializeStreamItem("") }. The info field, which is supposed to carry diagnostic detail, will always be the string "stream error" regardless of root cause.
Why existing code does not prevent it
The function signature constrains the error type to only impl Send + 'static'—there is no Display or Debug bound. Calling .to_string() on the error value would not even compile. The type system actively prevents propagating any information from the error value, making the discard unavoidable without an API change.
What the impact would be
In production, any failure in the SSE byte stream—TCP reset, HTTP timeout, reqwest error with connection context, chunked-encoding failure, TLS error—will surface to users and operators as the single string "stream error". There is no way to distinguish categories of failure or correlate with underlying network errors. This is new code introduced by the PR and makes diagnosing streaming failures essentially impossible.
How to fix it
Add a Display (or Debug) bound to the error type parameter in __parse_sse_stream, then use it to format the actual error:
pub fn __parse_sse_stream<E: std::fmt::Display + Send + 'static'>(
body: Pin<Box<dyn Stream<Item = Result<bytes::Bytes, E>> + Send>>,
) -> BoxStream<Result<String, String>> {
// ...
Some(Err(e)) => {
return Some((Err(format\!("stream error: {e}")), (body, buffer, data)));
}
}This bound must also propagate through __stream_request_impl, __stream_request_fallible_impl, and the Client trait (requiring C::Error: Display + Send + 'static'). Both existing Client implementations (reqwest::Client and reqwest_middleware::ClientWithMiddleware) already have error types that implement Display, so no existing code would break.
Step-by-step proof
- A client calls
cdc_events()on the generated Rust client. - This invokes
__stream_request_impl, which callsclient.stream_request()returning a pinned byte stream. __parse_sse_streamwraps the byte stream inside anunfold.- The TCP connection drops mid-stream; reqwest yields
Err(reqwest::Error { kind: Decode, ... })frombytes_stream(). - The
unfoldclosure matchesSome(Err(_))—thereqwest::Error(which would print something like"error decoding response body") is silently dropped. Some((Err("stream error".to_string()), ...))is returned.- The caller receives
Error::Protocol { info: "stream error", stage: DeserializeStreamItem("") }. - The operator sees only
"stream error"in logs with no further context to identify root cause.
| const parsed = JSON.parse(value.data); | ||
| if ("__reflectapi_reserved_stream_error" in parsed) { | ||
| yield new Result<O, IE>({ err: parsed.__reflectapi_reserved_stream_error as IE }); | ||
| } else { | ||
| yield new Result<O, IE>({ ok: parsed as O }); | ||
| } |
There was a problem hiding this comment.
🔴 In __sse_to_async_iterable_fallible (lib.ts line 341), the check "__reflectapi_reserved_stream_error" in parsed throws a TypeError at runtime when parsed is a non-object JSON value (null, number, boolean, or string). Since the server serializes success items as raw values without wrapping them in an object, any fallible stream where the success type O is a Rust primitive (i32, bool, String, or Option=None) will crash on every success item. The fix is to guard the check: if (parsed \!== null && typeof parsed === "object" && "__reflectapi_reserved_stream_error" in parsed).
Extended reasoning...
The bug is in __sse_to_async_iterable_fallible (lib.ts line 341). The newly introduced code does:
const parsed = JSON.parse(value.data);
if ("__reflectapi_reserved_stream_error" in parsed) { ... }
JavaScript's in operator throws TypeError when its right-hand operand is not an object. Numbers, booleans, strings, and null all cause this error — e.g. TypeError: Cannot use the in operator to search for __reflectapi_reserved_stream_error in 42.
On the server side (handler.rs), success items are serialized with serde_json::to_string(&v), which produces any valid JSON value depending on the type O. If O is i32, the SSE data field is 42; if O is bool, it is true; if O is String, it is "hello"; if O is Option<T> when None, it is null. JSON.parse on any of these returns a JavaScript primitive, not an object, so the in check throws.
The Rust type constraint is only O: serde::de::DeserializeOwned, which includes all primitives. Nothing in the type system prevents a user from registering a fallible stream handler with a primitive success type. Error items are always JSON objects (wrapped in StreamItemError { __reflectapi_reserved_stream_error: ... }), but success items are not.
Concrete proof walkthrough: (1) Server has a fallible stream with O=i32. (2) Server yields value 42; serialized as the string "42" in the SSE data field. (3) Client receives the SSE event with data "42". (4) JSON.parse("42") returns the JS number 42. (5) "__reflectapi_reserved_stream_error" in 42 throws TypeError: Cannot use the in operator to search for __reflectapi_reserved_stream_error in 42. (6) The async generator crashes; the stream consumer gets an unhandled exception.
Impact: fallible streaming is completely broken for any primitive success type. Given that O: serde::de::DeserializeOwned is the only constraint, this is a realistic scenario.
Fix: guard the in check with a type and null check before using the in operator:
if (parsed !== null && typeof parsed === "object" && "__reflectapi_reserved_stream_error" in parsed)
This is safe because the error sentinel key is only ever present in object-shaped values (the StreamItemError struct serialization), so the guard does not break error detection.
| }); | ||
| } | ||
|
|
||
| let st = handler(state, input, input_headers).map_err(|err| HandlerOutput { | ||
| code: err.status_code(), | ||
| body: serde_json::to_vec(&err).unwrap().into(), | ||
| headers: response_headers, | ||
| })?; | ||
|
|
||
| Ok(st.map(Ok).and_then(move |res| { | ||
| let res = res.into_result(); |
There was a problem hiding this comment.
🔴 In stream_handler_wrap (reflectapi/src/builder/handler.rs:475), serde_json::to_vec(&err).unwrap() panics if the error type E1 fails to serialize — for example, due to a custom Serialize impl that returns an error, or f64::NAN/f64::INFINITY values. The non-streaming handler_wrap handles serialization failures gracefully by returning an HTTP 500 response, but this newly-introduced streaming path does not, creating an inconsistency that can crash the server process.
Extended reasoning...
What the bug is
In the newly-introduced stream_handler_wrap function in reflectapi/src/builder/handler.rs, when the user-supplied handler function returns an Err(E1), the code at line 475 does:
let st = handler(state, input, input_headers).map_err(|err| HandlerOutput {
code: err.status_code(),
body: serde_json::to_vec(&err).unwrap().into(), // panics on serialization failure
headers: response_headers,
})?;The .unwrap() call will panic and crash the server process (or abort the tokio task) if serde_json::to_vec returns an error.
The specific code path that triggers it
- A client calls a streaming endpoint.
- The registered handler returns
Err(e)wheree: E1. stream_handler_wraptries to serializeeviaserde_json::to_vec(&err).- If the serialization fails (custom
Serializeimpl,f64::NANfield, non-string map key),.unwrap()panics. - The panic propagates through the axum handler, crashing the process or aborting the task.
Why existing code does not prevent it
The E1 type bound only requires serde::ser::Serialize, which is a fallible trait — its serialize method returns Result<(), Error>. The #[derive(serde::Serialize)] macro produces infallible implementations for most common types, which is why this does not trigger in the existing demo types. However, any user defining a custom Serialize implementation or using a type with f64::NaN/f64::Infinity could trigger the panic.
Contrast with handler_wrap
The non-streaming handler_wrap (lines 398-413) explicitly handles serialization failure and returns HTTP 500 instead of panicking. The streaming path should mirror this pattern.
Impact
A panic in an axum handler typically crashes the server process entirely. This means a single malformed error value from one streaming request could take down the entire server.
How to fix it
Replace:
body: serde_json::to_vec(&err).unwrap().into(),with something like:
body: serde_json::to_vec(&err)
.unwrap_or_else(|e| format\!("Failed to serialize error: {e}").into_bytes())
.into(),Or return a 500 HandlerOutput if serialization fails, mirroring the handler_wrap approach.
| "functions": [ | ||
| { | ||
| "name": "inout_test", | ||
| "path": "", | ||
| "input_type": { | ||
| "name": "reflectapi_demo::tests::serde::Either" | ||
| }, | ||
| "output_type": { | ||
| "name": "reflectapi_demo::tests::serde::Either" | ||
| }, | ||
| "serialization": [ | ||
| "json", | ||
| "msgpack" | ||
| ] | ||
| } |
There was a problem hiding this comment.
🔴 The newly added snapshot reflectapi_demo__tests__serde__union_docs.snap is missing "output_kind": "complete" in the function definition and is missing reflectapi::Empty in the output_types section. When the corresponding union_docs test is added, it will fail immediately against the actual serialized output.
Extended reasoning...
What the bug is and how it manifests
The PR introduces OutputType as a tagged enum (#[serde(tag = "output_kind", rename_all = "snake_case")]) and changes Function.output_type to use #[serde(flatten)]. This means every serialized Function struct now includes an output_kind field at the top level (e.g., "output_kind": "complete" for the Complete variant). Additionally, the PR changed the type registry so that reflectapi::Empty now appears in output_types (not just input_types), as confirmed by every other updated snapshot.
The newly added snapshot reflectapi_demo__tests__serde__union_docs.snap has both these fields missing/wrong.
The specific code path that triggers it
In reflectapi-schema/src/lib.rs, the OutputType enum is declared as:
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(tag = "output_kind", rename_all = "snake_case")]
pub enum OutputType {
Complete {
#[serde(skip_serializing_if = "Option::is_none", default)]
output_type: Option<TypeReference>,
},
Stream { ... },
}And Function has:
#[serde(flatten)]
pub output_type: OutputType,When a Function is serialized, the flattened Complete variant always emits "output_kind": "complete" as a sibling field alongside "output_type". The snapshot at lines 7-21 has "output_type": {...} but not "output_kind": "complete".
Why existing code does not prevent it
The snapshot was committed in a state that predates (or was generated without) the OutputType serialization changes. Since the corresponding test function union_docs does not exist anywhere in reflectapi-demo/src/tests/serde.rs (or anywhere else in the codebase), insta's snapshot runner never attempts to match against this file, so CI does not currently fail.
Impact
When the union_docs test is eventually written and run, insta will compare the actual serialized output (which includes "output_kind": "complete" and reflectapi::Empty in output_types) against this stale snapshot and immediately fail. The snapshot is also misleading—it suggests a correct baseline exists when it does not.
How to fix it
- Add the
union_docstest function toreflectapi-demo/src/tests/serde.rsdefining theEitherenum it references. - Run
cargo insta review(orINSTA_UPDATE=always cargo test) to regenerate the snapshot with the correctoutput_kindfield andreflectapi::Emptyinoutput_types.
Step-by-step proof
- The snapshot function definition (lines 8-21) shows
"output_type": { "name": "reflectapi_demo::tests::serde::Either" }with nooutput_kindfield. - With
#[serde(flatten)]onoutput_type: OutputTypeandOutputType::Complete { output_type: Some(...) }, serde serializes this as{ "output_kind": "complete", "output_type": { "name": "..." } }. - Every other snapshot updated in this PR (e.g.,
reflectapi_struct_empty.snap) correctly includes"output_kind": "complete"betweeninput_typeandoutput_type. - The snapshot output_types section (lines 79-134) begins with
reflectapi::Infalliblebut lacksreflectapi::Empty; all other updated snapshots havereflectapi::Emptyfirst in output_types. - Grepping the codebase for
union_docsor theEithertype confirms no test function exists, meaning this snapshot was committed without ever being validated by a test run.
| write!( | ||
| out, | ||
| " {}{}pub async fn {}(&self, input: {}, headers: {})\n\ | ||
| -> reflectapi::rt::FallibleStreamResponse<{}, {}, {}, C::Error>\n\ |
There was a problem hiding this comment.
why is it not the same Response type like in Complete case, but with generic parameter set to a Stream? Can we go without extra customer response types here?
There was a problem hiding this comment.
As in just inline the type alias? It's just huge and long and noisy
| if let Some(item_error_type) = &self.item_error_type { | ||
| format!( | ||
| "function {name}(client: Client) {{\n\ | ||
| return (input: {input_type}, headers: {input_headers}, options?: RequestOptions) => __stream_request_fallible<\n\ |
There was a problem hiding this comment.
same question here, if we could reuse the original normal Response type instead of introducing new one
There was a problem hiding this comment.
Isn't this already what you want?
get_first: (
input: {},
headers: myapi.proto.Headers,
options?: RequestOptions,
) => AsyncResult<
myapi.model.output.Pet | null,
myapi.proto.UnauthorizedError
>;
/**
* Stream of change data capture events for pets
*/
cdc_events: (
input: {},
headers: myapi.proto.Headers,
options?: RequestOptions,
) => AsyncResult<
AsyncIterable<myapi.model.output.Pet>,
myapi.proto.UnauthorizedError
>;
|
|
||
| pub type StreamResponse<T, AE, NE> = Result<BoxStream<Result<T, Error<AE, NE>>>, Error<AE, NE>>; | ||
|
|
||
| pub type FallibleStreamResponse<T, IE, AE, NE> = |
There was a problem hiding this comment.
All these IE, AE, NE are hard to read and understand what are these for. Either need comments or proper longer names for generic params. Also consider if we can cut back complexity by assuming errors are not coming from the server, only network disconnect to be considered.
There was a problem hiding this comment.
Please respond to one of the questions about how the server should handle errors while processing the stream.
| let mut header_map = http::HeaderMap::new(); | ||
| header_map.insert( | ||
| http::header::ACCEPT, | ||
| http::HeaderValue::from_static("text/event-stream"), |
There was a problem hiding this comment.
are we definitely bound to require this accept header? might be easier to assume that header by default? also we can assume the server only outputs new line (or zero byte in case of msgpack) separated json lines?
|
|
||
| # mandatory 3rd party dependencies | ||
| serde = { version = "1.0.197", features = ["derive"] } | ||
| futures-util = "0.3" |
There was a problem hiding this comment.
that is optional, is not it? not always to be required
There was a problem hiding this comment.
We can feature flag it behind sse if you really want, but anyone who uses async will have this in tree anyway

No description provided.