feat: implement queue routing and worker subscription model#428
Merged
niteshpurohit merged 6 commits intomainfrom Apr 9, 2026
Merged
feat: implement queue routing and worker subscription model#428niteshpurohit merged 6 commits intomainfrom
niteshpurohit merged 6 commits intomainfrom
Conversation
- Introduced a new Subscription class to manage worker queue and handler subscriptions. - Updated worker reserve logic to match jobs based on both queue and handler criteria. - Enhanced error handling for duplicate queue and handler names. - Improved documentation to clarify the new routing and subscription behavior. - Added tests to validate subscription functionality and error handling.
- Introduced a new HandlerMatcher class to encapsulate subscription handler matching for reservation scans. - Removed the ALL_HANDLER_NAMES constant and updated the reserve method to accept handler_names as nil. - Updated find_reserved_job and matching_job_id_for methods to utilize the new HandlerMatcher for improved clarity and functionality. - Adjusted type signatures in the RBS files to reflect the changes in method parameters and return types.
There was a problem hiding this comment.
Pull request overview
Implements queue routing and a worker subscription model so reservation only occurs when both the job’s queue and handler match what a worker has subscribed to, establishing a stable routing boundary for later reliability features.
Changes:
- Added
Worker::Subscriptionand wired workers to reserve jobs using(queues, handler_names)matching rather than queue-only matching. - Updated the in-memory queue store’s
reserveAPI to support subscription-aware scans and queue-order preference. - Tightened validation and documentation around duplicate/empty queue and handler configurations, with expanded test coverage.
Reviewed changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| docs/pages/runtime/workers.md | Documents explicit routing + subscription matching behavior. |
| docs/pages/runtime/job-model.md | Clarifies reserve semantics now respect subscription boundaries. |
| core/karya/README.md | Summarizes new routing/subscription model at the library level. |
| core/karya/lib/karya/worker/subscription.rb | Introduces immutable-ish subscription object for queue+handler matching. |
| core/karya/lib/karya/worker.rb | Switches worker reserve logic to subscription-aware reserve call. |
| core/karya/lib/karya/worker/configuration.rb | Builds and exposes subscription derived from queues + handler registry. |
| core/karya/lib/karya/worker/handler_registry.rb | Adds empty-handlers validation and exposes normalized handler names. |
| core/karya/lib/karya/worker_supervisor/handler_mapping.rb | Adds empty-handlers validation for supervisor handler mapping. |
| core/karya/lib/karya/queue_store/in_memory.rb | Implements subscription-aware reserve scanning across queues + handlers. |
| core/karya/lib/karya/queue_store/base.rb | Updates reserve contract to accept queue(s) + handler_names inputs. |
| core/karya/lib/karya/primitives/queue_list.rb | Enforces uniqueness of queues after normalization. |
| core/karya/lib/karya/cli/mapping_entry.rb | Rejects duplicate handler mappings at CLI parse time. |
| core/karya/lib/karya/cli/handler_parser.rb | Wires duplicate handler mapping errors to Thor::Error. |
| core/karya/sig/karya/worker/subscription.rbs | Adds RBS for new subscription class. |
| core/karya/sig/karya/worker.rbs | Exposes Worker#subscription in RBS. |
| core/karya/sig/karya/queue_store/base.rbs | Updates reserve signature for subscription-aware inputs. |
| core/karya/sig/karya/queue_store/in_memory.rbs | Updates reserve + adds HandlerMatcher typing. |
| core/karya/spec/... | Adds/updates tests for subscription matching, duplicates, and queue store reserve behavior. |
| core/karya/.reek.yml | Excludes new matcher method(s) from select smells. |
Comments suppressed due to low confidence (2)
core/karya/lib/karya/worker_supervisor/handler_mapping.rb:23
- HandlerMapping#normalize can silently overwrite handlers when two keys normalize to the same handler name (e.g., 'billing_sync' and ' billing_sync '). This makes configuration errors hard to detect and can route jobs to the wrong handler. Consider detecting normalized key collisions and raising InvalidWorkerSupervisorConfigurationError listing the duplicate normalized handler name(s).
def normalize
raise InvalidWorkerSupervisorConfigurationError, 'handlers must be a Hash' unless value.is_a?(Hash)
raise InvalidWorkerSupervisorConfigurationError, 'handlers must be present' if value.empty?
value.each_with_object({}) do |(name, handler), normalized|
normalized_name = Primitives::Identifier.new(:handler, name, error_class: InvalidWorkerSupervisorConfigurationError).normalize
normalized[normalized_name] = handler
end.freeze
core/karya/lib/karya/worker/handler_registry.rb:18
- HandlerRegistry#normalize currently overwrites entries when multiple input keys normalize to the same handler name, which can silently drop a handler mapping and produce a subscription that doesn’t reflect the intended configuration. Consider detecting normalized-name collisions and raising InvalidWorkerConfigurationError (similar to how duplicate queue names are rejected).
def initialize(value)
raise InvalidWorkerConfigurationError, 'handlers must be a Hash' unless value.is_a?(Hash)
raise InvalidWorkerConfigurationError, 'handlers must be present' if value.empty?
@value = value
@normalized_handlers = normalize
end
- Added validation to ensure handler_names is an Array in the HandlerMatcher class. - Introduced uniqueness check for handler names in the HandlerRegistry and HandlerMapping classes. - Updated Worker::Subscription to freeze after initialization to prevent further modifications. - Enhanced tests to cover new validation rules for handler names in both Worker and WorkerSupervisor classes.
- Added error handling for reserve method to ensure only one of `queue` or `queues` is provided. - Introduced a new method `normalize_reserve_queues` for better queue normalization logic. - Updated `find_reserved_job` and `matching_job_for` methods to return additional information. - Enhanced worker subscription validation to reject duplicate handler names. - Updated specs to cover new validation scenarios for queue reservation and worker subscription.
- Included the identifier primitive to enhance the queue store functionality. - This addition supports improved handling of job identifiers within the queue system.
- Changed @handler_names from an Array to a Hash to improve lookup efficiency. - Updated the initialization and normalization methods to reflect this change. - Ensures that handler names are stored as key-value pairs, enhancing data structure integrity.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
closes: #38