feat: implement backpressure policies and job attributes#429
feat: implement backpressure policies and job attributes#429niteshpurohit wants to merge 6 commits intomainfrom
Conversation
- Introduced `Karya::Backpressure::PolicySet` to manage concurrency and rate-limiting policies. - Added `priority`, `concurrency_key`, and `rate_limit_key` attributes to `Karya::Job` for enhanced job management. - Implemented `Karya::QueueStore::InMemory` to prioritize jobs based on their attributes while reserving. - Created tests to validate the behavior of job reservations under various priority and concurrency scenarios. - Updated documentation to reflect the new backpressure features and job model enhancements.
There was a problem hiding this comment.
Pull request overview
Adds first-class backpressure controls (priority, concurrency caps, fixed-window rate limits) to Karya’s job model and the in-memory queue store, and documents how workers/queue stores interact with these features.
Changes:
- Introduces
Karya::Backpressure::PolicySetwith concurrency and rate-limit policy objects and normalization/validation. - Extends
Karya::Jobwithpriority,concurrency_key, andrate_limit_key, and updatesQueueStore::InMemory#reserveto select the highest-priority eligible job while skipping concurrency-/rate-limited work. - Adds/updates specs and docs covering priority ordering and backpressure behavior.
Reviewed changes
Copilot reviewed 22 out of 22 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| docs/pages/runtime/workers.md | Documents priority ordering and that backpressure is configured on the queue store. |
| docs/pages/runtime/job-model.md | Documents new scheduling attributes on Karya::Job and default priority behavior. |
| docs/pages/reliability/backpressure.md | Documents PolicySet, concurrency caps, and rate-limit windows. |
| core/karya/spec/karya/worker_integration_spec.rb | Integration coverage for worker + in-memory store with new job attributes. |
| core/karya/spec/karya/queue_store/in_memory_reserve_spec.rb | Unit coverage for reserve ordering, priority, concurrency caps, and rate limits. |
| core/karya/spec/karya/queue_store/in_memory_initialization_spec.rb | Tests default initialization, default token generation, and policy_set validation. |
| core/karya/spec/karya/job/attributes_spec.rb | Tests defaults and validation for priority and policy keys. |
| core/karya/spec/karya/job_spec.rb | Tests job exposes new attributes and preserves them across transitions. |
| core/karya/spec/karya/backpressure/policy_set_spec.rb | Tests PolicySet normalization, lookup, and validation errors. |
| core/karya/sig/karya/queue_store/in_memory.rbs | Updates type surface for policy_set, rate limit admission state, and new helpers. |
| core/karya/sig/karya/job/attributes.rbs | Updates attribute hash typing for priority and policy keys. |
| core/karya/sig/karya/job.rbs | Adds new readers and initializer keywords for job scheduling attributes. |
| core/karya/sig/karya/backpressure.rbs | Adds RBS for backpressure policy types and PolicySet. |
| core/karya/lib/karya/queue_store/in_memory/store_state.rb | Extracts mutable internal store state, adds rate-limit admissions tracking. |
| core/karya/lib/karya/queue_store/in_memory/lease_duration.rb | Extracts lease duration normalization for reservations. |
| core/karya/lib/karya/queue_store/in_memory/handler_matcher.rb | Extracts handler subscription matching for reserve scans. |
| core/karya/lib/karya/queue_store/in_memory/backpressure_support.rb | Adds concurrency and rate-limit eligibility checks + admission pruning/recording. |
| core/karya/lib/karya/queue_store/in_memory.rb | Wires policy set, priority selection, and backpressure eligibility into reservation flow. |
| core/karya/lib/karya/job/attributes.rb | Adds normalization/validation for priority and optional policy keys. |
| core/karya/lib/karya/job.rb | Refactors job internals into identity/scheduling/lifecycle components and exposes new attrs. |
| core/karya/lib/karya/backpressure.rb | Implements policy classes, registry normalization, and lookup helpers. |
| core/karya/lib/karya.rb | Requires backpressure module at framework load time. |
- Added validation to ensure the period is a positive finite number in backpressure policies. - Updated job attributes to freeze internal component structs for immutability. - Modified in-memory queue store to include additional constants and improve job handling. - Enhanced specs to cover new validations and job behaviors, ensuring robustness in concurrency and rate-limiting scenarios.
- Added support for normalizing string attribute keys in policy hashes to ensure consistency. - Implemented error handling for unsupported policy attribute key types, raising an InvalidPolicyError for invalid inputs. - Updated documentation to clarify the behavior of rate-limit policies using rolling windows.
core/karya/lib/karya/queue_store/in_memory/backpressure_support.rb
Outdated
Show resolved
Hide resolved
- Updated the RateLimitPolicy documentation to reflect a rolling-window policy. - Refactored reserve maintenance logic to include pruning of stale rate-limit admissions. - Added tests to ensure proper functionality of rate-limit admission pruning. - Improved documentation for worker behavior regarding backpressure policies.
- Introduced ReserveScanState class to encapsulate backpressure state during reserve scans. - Refactored reserve method to utilize normalized reserve requests for cleaner logic. - Updated find_reserved_job and matching_job_for methods to leverage reserve scan state. - Removed outdated concurrency and rate limit checks from BackpressureSupport module. - Added test to ensure unconfigured rate-limit keys do not record admissions.
- Introduced ReserveScanState class to encapsulate backpressure state during reserve scans. - Added methods for initializing the reserve scan state and checking job conditions. - Updated existing methods to utilize ReserveScanState for improved concurrency handling. - Enhanced the reserve maintenance process to better manage job reservations and rate limits.
| def prune_stale_rate_limit_admissions(now) | ||
| state.rate_limit_admissions_by_key.dup.each_key do |rate_limit_key| | ||
| policy = policy_set.rate_limit_policy_for(rate_limit_key) | ||
| unless policy | ||
| state.delete_rate_limit_key(rate_limit_key) | ||
| next |
There was a problem hiding this comment.
prune_stale_rate_limit_admissions duplicates the entire rate_limit_admissions_by_key hash on every reserve/maintenance pass. If this hash grows, that copy can become a noticeable per-reserve overhead. Consider iterating over a snapshot of keys instead (e.g., state.rate_limit_admissions_by_key.keys.each) to avoid duplicating values/arrays while still allowing safe mutation of the underlying hash during the loop.
Karya::Backpressure::PolicySetto manage concurrency and rate-limiting policies.priority,concurrency_key, andrate_limit_keyattributes toKarya::Jobfor enhanced job management.Karya::QueueStore::InMemoryto prioritize jobs based on their attributes while reserving.closes: #39