refactor(stream): unify inbound dispatch via RouteInbound#293
Merged
Conversation
Add MessageRouter.RouteInboundMessage as the symmetric server-side counterpart of RouteMessage. On the inbound side the high bit marks responses to this server's own calls (route to pending map) while low-bit IDs are new client requests (return false to caller). Apply the null object pattern to replace the nil PeerNode returned by AcceptPeer for regular clients. The new nilPeerNode type implements PeerNode by routing directly to the inbound stream via Send and always returning false from RouteInbound, eliminating all peerNode != nil branches from NodeStream. Rename PeerNode.RouteResponse to RouteInbound and delegate Node's implementation to the new RouteInboundMessage method. Update tests to use server-initiated IDs (ServerSequenceNumber) where appropriate and replace the reflect-based bool flag in TestAcceptPeer with concrete type comparison against *Node and *nilPeerNode. Fix a data race in the pre-existing TestRouterRouteMessage test by replacing time.Sleep with a done channel on mockRequestHandler.
Move client-initiated request dispatch out of NodeStream and into RouteInbound / RouteInboundMessage, so the full routing decision is encapsulated in the router and PeerNode rather than split across NodeStream and its callers. Key changes: - Store RequestHandler in MessageRouter (NewMessageRouter variadic arg) for both back-channel (RouteMessage) and server-side inbound dispatch (RouteInboundMessage); remove the handler field from stream.Server and the corresponding parameter from NewServer. - Change RouteInboundMessage signature: drop bool return value, add ctx, release, and send parameters. Client-initiated messages are now dispatched to the handler (or release() called) inside the method. release() is always called on all code paths. - Update PeerNode.RouteInbound interface to match: signature changes from (msg) bool to (ctx, msg, release, send). Node.RouteInbound and nilPeerNode.RouteInbound updated accordingly. - Thread the inboundManager.handler (renamed from selfHandler) into newInboundNode and nilPeerNode so every inbound node dispatches client requests to the same handler. - Simplify NodeStream recv loop: remove the if/continue branch; call peerNode.RouteInbound unconditionally for every received message. - Fix RouteMessage argument order: ctx moved before nodeID to match Go convention and match RouteInboundMessage. - Update all Route* doc comments to use consistent style with [Register] cross-references, and fix an incorrect inline comment about server-initiated IDs in RouteMessage. - Rewrite TestNodeRouteInbound and TestRouterRouteInboundMessage as four subtests each, covering pending delivery, stale absorption, nil-handler release, and handler dispatch.
Contributor
|
|
Overall Grade |
Security Reliability Complexity Hygiene |
Code Review Summary
| Analyzer | Status | Updated (UTC) | Details |
|---|---|---|---|
| Go | Mar 21, 2026 5:50p.m. | Review ↗ | |
| Shell | Mar 21, 2026 5:50p.m. | Review ↗ |
Contributor
There was a problem hiding this comment.
Pull request overview
Refactors the server-side inbound stream receive path so both client-initiated requests and server-initiated responses are dispatched through a single PeerNode.RouteInbound(...) entry point, aligning server-side dispatch behavior with the existing router abstraction.
Changes:
- Replaces
PeerNode.RouteResponse(msg) bool+ direct handler dispatch with unifiedPeerNode.RouteInbound(ctx, msg, release, send). - Introduces
MessageRouter.RouteInboundMessage(...)to demultiplex inbound server-side messages (client-initiated requests vs server-initiated responses). - Updates inbound manager behavior to return a concrete
nilPeerNodefor untracked clients, eliminating nil-guards inNodeStream, and updates tests accordingly.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| server.go | Updates stream.NewServer callsite for new constructor signature (handler no longer passed). |
| internal/stream/server.go | Removes asymmetric routing/handler path; always routes inbound via peerNode.RouteInbound and always sends via peerNode.Enqueue. |
| internal/stream/router.go | Adds RouteInboundMessage and updates router docs to cover both inbound/server-side and outbound/client-side routing cases. |
| node.go | Stores handler in router at inbound node construction; replaces Node.RouteResponse with Node.RouteInbound. |
| inbound_manager.go | Renames handler field, passes handler to inbound nodes, returns nilPeerNode for untracked connections, and adds nilPeerNode implementation. |
| internal/stream/router_test.go | Adds coverage for RouteInboundMessage and makes handler mock goroutine-safe. |
| node_test.go | Replaces RouteResponse tests with RouteInbound tests and updates inbound node construction signature. |
| inbound_manager_test.go | Updates TestAcceptPeer expectations to assert concrete returned type (*Node vs *nilPeerNode). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
After a stream.Send failure, gRPC cancels the stream context, but there is a race window before ctx.Done() propagates in the NodeStream goroutine. During that window, up to sendBufferSize additional messages can be dequeued from the finished channel and each hits another failing Send call. Add an atomic.Bool failed field to nilPeerNode and check it at the top of Enqueue. The first Send error sets the flag; all subsequent calls return immediately. The existing ctx.Done() select arm in NodeStream still handles goroutine termination; this change only eliminates the O(buffer) wasted gRPC send attempts in the interim.
There was a dependabot issue for grpc.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Unify inbound dispatch via RouteInbound
Refactor the inbound receive path to achieve dispatch symmetry: both
server-initiated responses and client-initiated requests now flow through a
single
PeerNode.RouteInboundcall instead of two separate code paths inNodeStream.Motivation
Previously,
NodeStreamininternal/stream/server.gohandled inboundmessages with an asymmetric two-step pattern:
peerNode.RouteResponse(msg)to try routing server-initiatedresponse IDs to pending calls; if that returns
false, fall through.server.handler.HandleRequest(...)directly for client-initiatedrequests.
This meant the
stream.Serverhad to carry aRequestHandlerfield, thePeerNodeinterface exposed a boolean-returningRouteResponsemethod, andAcceptPeerreturnednilfor untracked connections, requiring nil-guardsthroughout
NodeStream.Changes
internal/stream/server.goPeerNode.RouteResponse(msg) boolreplaced byPeerNode.RouteInbound(ctx, msg, release, send), which handles bothresponse delivery and request dispatch internally.
NewServerno longer accepts aRequestHandlerargument; the handler isnow registered at node construction time.
NodeStreamreceive loop reduced to a singlepeerNode.RouteInbound(...)call; nil-guard on
peerNodein the send path removed.internal/stream/router.goRouteInboundMessagemethod added toMessageRouter: the symmetricserver-side counterpart of
RouteMessage. Low-bit (client-initiated) IDsare dispatched to the handler; high-bit (server-initiated) IDs are
delivered to the pending call map.
NewMessageRouterdocumentation updated to describe both uses of thehandler.
node.gonewInboundNodegains ahandler stream.RequestHandlerparameter andpasses it to
NewMessageRouter, so the handler is stored in the routerrather than on the server.
Node.RouteResponsereplaced byNode.RouteInbound, which delegates torouter.RouteInboundMessage.inbound_manager.goselfHandlerfield renamed tohandlerto reflect that it now covers allinbound nodes, not only the self-node.
newInboundNodecall sites pass the handler.AcceptPeerreturns anilPeerNodevalue (instead ofnil) foruntracked connections, removing nil-guard complexity from
NodeStream.nilPeerNodetype implementsPeerNodefor regular clients that haveno back-channel capability:
RouteInbounddispatches to the handler (orcalls
release), andEnqueuewrites directly to the stream.server.gostream.NewServercall updated to drop the now-removed handler argument.Tests
inbound_manager_test.go:TestAcceptPeerupdated to assert the concretetype returned (
*Nodevs*nilPeerNode) usingreflect.TypeOfinstead ofa nil check.
internal/stream/router_test.go: newTestRouterRouteInboundMessagetestcovers all four cases (client-initiated with/without handler,
server-initiated pending delivery, server-initiated stale absorption).
mockRequestHandlermade goroutine-safe withatomic.Booland adonechannel;
TestRouterRouteMessageupdated to use the new constructor and achannel-based wait instead of
time.Sleep.node_test.go:TestNodeRouteResponsereplaced byTestNodeRouteInboundwith four subtests mirroring the router tests.
No behaviour change
This is a pure refactor. The observable behaviour of all call paths is
identical to before; only the internal structure of the dispatch loop and the
interface surface are simplified.