feat: Asynq Fanout Mechanism and HYOK Refresh Frequency#191
feat: Asynq Fanout Mechanism and HYOK Refresh Frequency#191jmpTeixeira02 wants to merge 11 commits intomainfrom
Conversation
61c1be2 to
3734bb5
Compare
d86a877 to
83e98e4
Compare
94f79c5 to
5c2aae5
Compare
There was a problem hiding this comment.
The PR introduces a fan-out mechanism for async task processing and increases HYOK sync frequency. Several substantive issues were found: a nil dereference on asynqTask before its nil check in BatchProcessor, a nil panic risk in TenantNameRefresher.Process when GetTenant errors, a duplicate log entry in KeystorePoolFiller, and a behavioral change where asynq.Timeout(0) silently disables timeouts for all tasks that don't configure one. The existing reviewer comments also raise important architectural points (optional FanOutHandler interface, error wrapping, duplicate child handler registration) that should be addressed.
PR Bot Information
Version: 1.18.5 | 📖 Documentation | 🚨 Create Incident | 💬 Feedback
- Correlation ID:
3523ec10-1ed5-11f1-941d-4315e0fd7204 - Event Trigger:
issue_comment.created - LLM:
anthropic--claude-4.6-sonnet
f8c8daa to
1fee48f
Compare
fd34c25 to
447d38d
Compare
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughAdds tenant-aware async processing with fan-out: new BatchProcessor and fan-out primitives, tenant-scoped task handlers, config fields for task timeouts and fan-out, worker API changed to accept a repository, and many task implementations/tests updated to use tenant-aware flow. Changes
Sequence Diagram(s)sequenceDiagram
participant Worker as Task Worker
participant Async as Async Framework
participant Repo as Repository
participant BP as BatchProcessor
participant Queue as Task Queue
Worker->>Async: RunWorker(ctx, repo)
Async->>Repo: Query tenants (TenantQuery)
Repo-->>Async: tenant batches
Async->>BP: ProcessTenantsInBatch(ctx, task, processTenant)
loop per tenant / batch
BP->>BP: enrich ctx with tenant info
alt fan-out enabled
BP->>Queue: FanOutTask(childTask, tenant payload)
Queue-->>BP: enqueued child task
else direct execution
BP->>Async: handler.ProcessTask(enrichedCtx)
Async-->>BP: result
end
end
BP-->>Async: processing complete
Async-->>Worker: done
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
447d38d to
bf1cd69
Compare
There was a problem hiding this comment.
Actionable comments posted: 10
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
internal/async/tasks/send_notfication.go (1)
36-40:⚠️ Potential issue | 🟠 MajorReturn a non-retriable error for malformed payloads.
JSON unmarshal failures are permanent errors for that payload. Returning the raw error triggers unnecessary retries in asynq, wasting worker capacity. Wrap the error with
asynq.SkipRetryto signal that this task should not be retried.Add
fmtimport and update the error return:Proposed fix
import ( "context" "encoding/json" + "fmt" "github.com/hibiken/asynq"err := json.Unmarshal(task.Payload(), &data) if err != nil { log.Error(ctx, "failed to unmarshal notification payload", err) - return err + return fmt.Errorf("%w: invalid notification payload: %v", asynq.SkipRetry, err) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/async/tasks/send_notfication.go` around lines 36 - 40, The JSON unmarshal failure in sendNotification task (json.Unmarshal against data from task.Payload()) is a permanent/malformed-payload error and must not trigger retries; import fmt and change the returned error to wrap the original with asynq.SkipRetry (e.g., return fmt.Errorf("%w: %v", asynq.SkipRetry, err) or fmt.Errorf("%w: failed to unmarshal payload: %v", asynq.SkipRetry, err)) and keep the existing log.Error(ctx, "failed to unmarshal notification payload", err) call so the worker logs the issue but asynq won't retry the task.internal/async/tasks/tenant/workflow_cleanup_test.go (1)
44-49:⚠️ Potential issue | 🟡 MinorThis subtest no longer covers the nil-task path.
Line 48 passes the shared non-nil
task, so the nil-input branch is no longer exercised and the test name/assertion text are now misleading. Keepnilhere, or rename the subtest if nil is no longer supported.💡 Minimal fix
- err := cleaner.ProcessTask(context.Background(), task) + err := cleaner.ProcessTask(context.Background(), nil)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/async/tasks/tenant/workflow_cleanup_test.go` around lines 44 - 49, The subtest named "Should handle nil task parameter" no longer passes nil because it calls cleaner.ProcessTask(..., task); update the test to actually exercise the nil branch by calling cleaner.ProcessTask(context.Background(), nil) (keeping WorkflowRemovalMock, tasks.NewWorkflowCleaner, cleaner.ProcessTask and the existing assert.NoError) or alternatively rename the subtest and assertion to reflect the non-nil case if nil is no longer supported; make the change in the Test function that constructs WorkflowRemovalMock and calls tasks.NewWorkflowCleaner so the test name matches the input it supplies.
🧹 Nitpick comments (10)
charts/cmk/values.yaml (1)
710-713: Nice example update—consider addingfanOutTask.timeOutfor full parity.To avoid ambiguity, include the child timeout in this commented example too, matching the dev values structure.
Suggested doc tweak
# timeOut: 5s # fanOutTask: # enabled: true # retries: 0 + # timeOut: 2s🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@charts/cmk/values.yaml` around lines 710 - 713, Add the child timeout to the commented example so it mirrors the dev structure: under the existing commented fanOutTask block (fanOutTask: enabled, retries) include a commented timeOut entry (e.g. fanOutTask.timeOut) with the same value/format used in dev values so the example shows full parity with the dev configuration for fanOutTask.timeOut.internal/async/tasks/tenant/workflow_cleanup.go (1)
17-21: Unusedprocessorfield.The
processorfield is initialized inNewWorkflowCleaner(line 31) but never used inProcessTaskor any other method. This appears to be dead code or an incomplete implementation.If batch processing is intended,
ProcessTaskshould utilize theprocessor. Otherwise, consider removing the unused field and its initialization.♻️ If batch processing is not needed, remove the unused field
type WorkflowCleaner struct { workflowRemoval WorkflowRemoval repo repo.Repo - processor *async.BatchProcessor } func NewWorkflowCleaner( workflowRemoval WorkflowRemoval, repo repo.Repo, opts ...async.TaskOption, ) async.TenantTaskHandler { wc := &WorkflowCleaner{ workflowRemoval: workflowRemoval, repo: repo, - processor: async.NewBatchProcessor(repo), }Also applies to: 30-31, 41-43
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/async/tasks/tenant/workflow_cleanup.go` around lines 17 - 21, The WorkflowCleaner struct contains an unused processor field (type *async.BatchProcessor) that is set in NewWorkflowCleaner but never referenced in ProcessTask or elsewhere; either remove the processor field and its initialization in NewWorkflowCleaner (and any related imports) to eliminate dead code, or update ProcessTask to actually use the processor (e.g., submit cleanup work to the processor) so the field is required—locate WorkflowCleaner, NewWorkflowCleaner and ProcessTask to implement the chosen fix.internal/config/loader.go (1)
55-59: DuplicateTaskTypeentries are silently overwritten.If
cfg.Scheduler.Taskscontains multiple entries with the sameTaskType, the later entry will silently replace the earlier one intaskMap. This could lead to unexpected configuration loss.Consider adding a check to detect and warn about (or reject) duplicate task types.
♻️ Proposed fix to detect duplicates
// load taskMap cfg.Scheduler.taskMap = make(map[string]Task, len(cfg.Scheduler.Tasks)) for _, task := range cfg.Scheduler.Tasks { + if _, exists := cfg.Scheduler.taskMap[task.TaskType]; exists { + slog.Warn("Duplicate task type in scheduler configuration, later entry will be used", + "taskType", task.TaskType) + } cfg.Scheduler.taskMap[task.TaskType] = task }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/config/loader.go` around lines 55 - 59, The current loop that populates cfg.Scheduler.taskMap from cfg.Scheduler.Tasks silently overwrites entries with the same TaskType; update the code that iterates over cfg.Scheduler.Tasks to check whether cfg.Scheduler.taskMap[task.TaskType] already exists before assigning, and on duplicate either log a warning (via the existing logger) or return an error to abort loading; reference the cfg.Scheduler.taskMap map, the cfg.Scheduler.Tasks slice, and the Task.TaskType field when adding this duplicate-detection logic so duplicates are reported instead of being silently replaced.internal/async/tasks/tenant/hyok_sync_test.go (1)
17-29: Remove unused test fixtures.
errMockSyncHYOKClientandHyokHYOKClientMockFailedare no longer used since the failure test case was removed. Consider cleaning up this dead code.🧹 Proposed cleanup
-var errMockSyncHYOKClient = errors.New("error syncing hyok client") - type HyokHYOKClientMock struct{} func (s *HyokHYOKClientMock) SyncHYOKKeys(_ context.Context) error { return nil } - -type HyokHYOKClientMockFailed struct{} - -func (s *HyokHYOKClientMockFailed) SyncHYOKKeys(_ context.Context) error { - return errMockSyncHYOKClient -}Also remove the unused
errorsimport at line 5.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/async/tasks/tenant/hyok_sync_test.go` around lines 17 - 29, Remove the dead test fixtures and import: delete the unused error variable errMockSyncHYOKClient, the unused failing mock type HyokHYOKClientMockFailed (and its SyncHYOKKeys method), and remove the now-unused errors import; keep only HyokHYOKClientMock which is used by the remaining tests so there are no unused symbols or imports left.internal/async/fanout_test.go (1)
7-7: Consider usinggithub.com/stretchr/testify/assertfor consistency.Other test files in this PR and across the codebase use
github.com/stretchr/testify/assert. Usinggithub.com/zeebo/asserthere introduces an inconsistency that could cause confusion during maintenance.♻️ Proposed fix
- "github.com/zeebo/assert" + "github.com/stretchr/testify/assert"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/async/fanout_test.go` at line 7, The test imports and uses github.com/zeebo/assert which is inconsistent with the codebase; replace the import with github.com/stretchr/testify/assert and update assertion calls in internal/async/fanout_test.go to the testify signature (e.g., assert.Equal(t, expected, actual), assert.NoError(t, err), assert.True(t, cond), etc.), ensuring each assertion call now passes the *testing.T from the test function; search for uses of the zeebo assert symbols in fanout_test.go and convert them to the testify equivalents.internal/config/tasks.go (1)
28-35: Consider operational implications of the 5-minute interval.The task timeout (5 minutes) equals the cron interval. If the task occasionally runs longer than expected, the next scheduled run could start while the previous one is still processing. Asynq typically handles this with uniqueness constraints, but worth verifying the behavior aligns with expectations.
Also, using explicit
3for Retries instead ofdefaultRetryCountis inconsistent with other tasks in this map. Consider usingdefaultRetryCountfor maintainability.♻️ Proposed consistency fix
TypeHYOKSync: { Enabled: true, Cronspec: "*/5 * * * *", // Every 5 minutes - Retries: 3, + Retries: defaultRetryCount, TimeOut: 5 * time.Minute,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/config/tasks.go` around lines 28 - 35, The Cronspec/TimeOut configuration currently sets Cronspec to "*/5 * * * *" and TimeOut to 5 * time.Minute (and FanOutTask.TimeOut also 5 * time.Minute), which makes the task timeout equal to the cron interval and risks overlap; update the configuration so task TimeOut is strictly less than the cron interval or add/verify uniqueness/locking semantics before scheduling to prevent concurrent runs (inspect the worker uniqueness mechanism used by this task), and replace the hard-coded Retries: 3 with the shared defaultRetryCount constant for consistency; refer to the Cronspec, TimeOut, FanOutTask.TimeOut, and Retries fields to locate and change these values.internal/async/batch_processor.go (1)
135-139: Consider logging when batch processing encounters errors.When
repo.ProcessInBatchreturns an error, it's returned directly without logging. For observability, consider logging the error before returning to help with debugging batch processing failures.📊 Suggested logging
+ if err != nil { + log.Error(ctx, "Batch processing failed", err) + return err + } + if err == nil { log.Info(ctx, "Task completed", slog.Int("totalTenantCount", totalTenantCount)) } return err🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/async/batch_processor.go` around lines 135 - 139, When repo.ProcessInBatch returns an error, currently it is returned without logging; add an error log before returning — e.g., call log.Error (or existing logger) with context, the error value, and useful fields such as totalTenantCount and any task identifier so failures are observable; update the block around the repo.ProcessInBatch return (where log.Info is called on success) to log on err != nil and then return err.internal/async/tasks/tenant/system_refresh.go (1)
62-68: Unusedprocessorfield.Similar to
TenantNameRefresher, theprocessorfield is initialized but not used within this handler. The batch processing is handled externally inasync.go. Consider removing the field or documenting why it's retained.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/async/tasks/tenant/system_refresh.go` around lines 62 - 68, The SystemsRefresher type contains an unused processor field (like in TenantNameRefresher) which is initialized but never referenced; remove the processor field from the SystemsRefresher struct and any initialization code that sets it, or if you intentionally keep it, add a comment documenting why it's retained for future use (e.g., external batch handling in async.go). Update constructors/initializers that create SystemsRefresher and any tests to stop passing or asserting processor, and run go vet/static checks to ensure no leftover references remain; keep TenantQuery and FanOutFunc implementations unchanged.internal/config/config.go (1)
128-141: Consider adding validation forTimeOutandFanOutTaskfields.The new
Task.TimeOutandFanOutTaskfields lack validation inScheduler.Validate(). Consider validating thatTimeOutis non-negative and thatFanOutTask.Retriesis within reasonable bounds if enabled.💡 Suggested validation additions
func (s *Scheduler) Validate() error { checkedTasks := make(map[string]struct{}, len(s.Tasks)) for _, task := range s.Tasks { _, found := PeriodicTasks[task.TaskType] if !found { return ErrNonDefinedTaskType } _, found = checkedTasks[task.TaskType] if found { return ErrRepeatedTaskType } + if task.TimeOut < 0 { + return errors.New("task timeout cannot be negative") + } + + if task.FanOutTask.Enabled && task.FanOutTask.Retries < 0 { + return errors.New("fan-out task retries cannot be negative") + } + checkedTasks[task.TaskType] = struct{}{} } return nil }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/config/config.go` around lines 128 - 141, Add validation in Scheduler.Validate() to ensure Task.TimeOut and FanOutTask.TimeOut are non-negative and that when FanOutTask.Enabled is true, FanOutTask.Retries falls within acceptable bounds (e.g., >=0 and <= a max retry limit). Update the validation logic that currently checks tasks to also inspect each Task.TimeOut and the nested FanOutTask (FanOutTask.Enabled, FanOutTask.Retries, FanOutTask.TimeOut), returning a clear validation error from Scheduler.Validate() when any of these constraints are violated.internal/async/tasks/tenant/tenant_name_refresh.go (1)
65-67: Clarify the intent of theTenantQueryfilter.The query filters for tenants where
Nameequalsrepo.Empty. This appears to target tenants with empty/unset names for refresh. Consider adding a brief comment to clarify this intent for future maintainers.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/async/tasks/tenant/tenant_name_refresh.go` around lines 65 - 67, Add a brief clarifying comment above TenantNameRefresher.TenantQuery explaining that the query intentionally selects tenant records whose Name field equals repo.Empty (i.e., tenants with empty or unset names) so they can be targeted for a name refresh; reference the TenantNameRefresher.TenantQuery method and the repo.Name and repo.Empty symbols in the comment so future maintainers understand the intent.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@charts/cmk/values-dev.yaml`:
- Around line 629-636: The cronspec for the HYOK key sync in the dev values is
set to "@every 1h" but should be every 5 minutes; update the cronspec key
(cronspec) under the task block where taskType: key:sync to use "@every 5m" so
the cron will run every 5 minutes in dev.
In `@cmd/task-worker/main.go`:
- Around line 85-97: After successfully calling db.StartDBConnection and getting
dbCon, add a defer to always close the connection (e.g., defer dbCon.Close() or
the appropriate Close(ctx) method) immediately after the success check so dbCon
is released on all code paths (including failures in registerTasks and
cron.RunWorker and normal shutdown); locate the db.StartDBConnection call and
the dbCon variable in main.go and insert the defer right after the error check,
and if Close returns an error ensure it is handled or logged rather than
ignored.
In `@internal/async/async.go`:
- Around line 284-298: getFanOutOpts currently returns taskCfg.Enabled when
FanOutTask is enabled which is wrong; update the final return in the
getFanOutOpts function to return true, opts (i.e., return true, opts) so that
the presence of FanOutTask.Enabled controls the boolean result rather than the
parent task's Enabled flag; keep the existing assembly of opts (MaxRetry and
optional Timeout) unchanged.
- Around line 161-179: The closure passed to mux.HandleFunc inside the for
taskName, handler := range a.tasks loop captures the loop variables leading to
all handlers referencing the last iteration; fix by capturing loop vars in new
locals before creating the closure (e.g., localHandler := handler and
localTaskName := taskName) and use those locals inside the func passed to
mux.HandleFunc so TenantTaskHandler branches (TenantTaskHandler, h.TaskType(),
a.getFanOutOpts, NewBatchProcessor, WithFanOutTenants, WithTenantQuery,
h.ProcessTask) operate on the correct handler instance.
In `@internal/async/batch_processor_test.go`:
- Around line 49-57: The test currently expects bp.ProcessTenantsInBatch to
return nil even when the per-tenant callback returns errors; change
ProcessTenantsInBatch to continue processing remaining tenants but record any
per-tenant errors and return a non-nil aggregated error if any tenant failed.
Update the implementation referenced by ProcessTenantsInBatch (the batch loop
that invokes the tenant callback) to collect errors (e.g., append or count
failures) and after the loop return a combined error (or a sentinel error
indicating partial failure) instead of always returning nil; keep the behavior
of continuing execution for all tenants.
In `@internal/async/batch_processor.go`:
- Around line 113-130: The fan-out branch calls asynqTask.Payload() without
checking for nil; add a nil guard before using asynqTask (when bp.fanOut is
true) to avoid a panic: check if asynqTask == nil and handle it consistently
with the non-fanOut branch (e.g., log the error via log.Error(ctx, "nil task in
fan-out") and skip/continue or return an explicit error), then proceed to call
asyncUtils.NewTaskPayload(ctx, asynqTask.Payload()) and FanOutTask only when
asynqTask is non-nil.
In `@internal/async/tasks/tenant/system_refresh.go`:
- Around line 43-45: The empty Process method on SystemsRefresher is dead code
and should be removed; delete the func (s *SystemsRefresher) Process(ctx
context.Context, _ *asynq.Task) error { return nil } declaration so the type
only exposes the required TenantTaskHandler members (ProcessTask, TaskType,
TenantQuery, FanOutFunc) and avoid leaving unused methods like Process around.
In `@internal/async/tasks/tenant/tenant_name_refresh.go`:
- Around line 42-63: Remove the unused processor field from the
TenantNameRefresher struct and its initialization in NewTenantNameRefresher:
locate the TenantNameRefresher type declaration and delete the processor field,
then update NewTenantNameRefresher to stop creating or assigning a
BatchProcessor; ensure no other references to processor remain in methods like
ProcessTask (they shouldn't be used) and mirror the same change pattern used in
other handlers such as WorkflowExpiryProcessor and SystemsRefresher so
constructors and structs stay consistent.
In `@internal/async/tasks/tenant/workflow_expiry.go`:
- Around line 63-66: In the loop that calls w.expireWorkflow(ctx, wf.ID) (in
workflow_expiry.go), do not return immediately on the first error; instead call
expireWorkflow for each expired workflow, collect any errors (e.g., append to a
slice or use an error-aggregation helper), continue processing remaining
workflows, and after the loop return a single aggregated error if any failures
occurred (or nil if none); ensure you reference w.expireWorkflow and the
enclosing function so the change replaces the current immediate return with
error collection and final aggregated return.
In `@internal/log/log.go`:
- Around line 15-18: The helper WithLogInjectAttrs is passing the attrs slice as
a single argument to slogctx.With instead of expanding it; change the call
inside WithLogInjectAttrs to unpack attrs (use the variadic spread) so
slogctx.With receives the individual slog.Attr values (i.e., call
slogctx.With(ctx, attrs...) rather than slogctx.With(ctx, attrs)).
---
Outside diff comments:
In `@internal/async/tasks/send_notfication.go`:
- Around line 36-40: The JSON unmarshal failure in sendNotification task
(json.Unmarshal against data from task.Payload()) is a
permanent/malformed-payload error and must not trigger retries; import fmt and
change the returned error to wrap the original with asynq.SkipRetry (e.g.,
return fmt.Errorf("%w: %v", asynq.SkipRetry, err) or fmt.Errorf("%w: failed to
unmarshal payload: %v", asynq.SkipRetry, err)) and keep the existing
log.Error(ctx, "failed to unmarshal notification payload", err) call so the
worker logs the issue but asynq won't retry the task.
In `@internal/async/tasks/tenant/workflow_cleanup_test.go`:
- Around line 44-49: The subtest named "Should handle nil task parameter" no
longer passes nil because it calls cleaner.ProcessTask(..., task); update the
test to actually exercise the nil branch by calling
cleaner.ProcessTask(context.Background(), nil) (keeping WorkflowRemovalMock,
tasks.NewWorkflowCleaner, cleaner.ProcessTask and the existing assert.NoError)
or alternatively rename the subtest and assertion to reflect the non-nil case if
nil is no longer supported; make the change in the Test function that constructs
WorkflowRemovalMock and calls tasks.NewWorkflowCleaner so the test name matches
the input it supplies.
---
Nitpick comments:
In `@charts/cmk/values.yaml`:
- Around line 710-713: Add the child timeout to the commented example so it
mirrors the dev structure: under the existing commented fanOutTask block
(fanOutTask: enabled, retries) include a commented timeOut entry (e.g.
fanOutTask.timeOut) with the same value/format used in dev values so the example
shows full parity with the dev configuration for fanOutTask.timeOut.
In `@internal/async/batch_processor.go`:
- Around line 135-139: When repo.ProcessInBatch returns an error, currently it
is returned without logging; add an error log before returning — e.g., call
log.Error (or existing logger) with context, the error value, and useful fields
such as totalTenantCount and any task identifier so failures are observable;
update the block around the repo.ProcessInBatch return (where log.Info is called
on success) to log on err != nil and then return err.
In `@internal/async/fanout_test.go`:
- Line 7: The test imports and uses github.com/zeebo/assert which is
inconsistent with the codebase; replace the import with
github.com/stretchr/testify/assert and update assertion calls in
internal/async/fanout_test.go to the testify signature (e.g., assert.Equal(t,
expected, actual), assert.NoError(t, err), assert.True(t, cond), etc.), ensuring
each assertion call now passes the *testing.T from the test function; search for
uses of the zeebo assert symbols in fanout_test.go and convert them to the
testify equivalents.
In `@internal/async/tasks/tenant/hyok_sync_test.go`:
- Around line 17-29: Remove the dead test fixtures and import: delete the unused
error variable errMockSyncHYOKClient, the unused failing mock type
HyokHYOKClientMockFailed (and its SyncHYOKKeys method), and remove the
now-unused errors import; keep only HyokHYOKClientMock which is used by the
remaining tests so there are no unused symbols or imports left.
In `@internal/async/tasks/tenant/system_refresh.go`:
- Around line 62-68: The SystemsRefresher type contains an unused processor
field (like in TenantNameRefresher) which is initialized but never referenced;
remove the processor field from the SystemsRefresher struct and any
initialization code that sets it, or if you intentionally keep it, add a comment
documenting why it's retained for future use (e.g., external batch handling in
async.go). Update constructors/initializers that create SystemsRefresher and any
tests to stop passing or asserting processor, and run go vet/static checks to
ensure no leftover references remain; keep TenantQuery and FanOutFunc
implementations unchanged.
In `@internal/async/tasks/tenant/tenant_name_refresh.go`:
- Around line 65-67: Add a brief clarifying comment above
TenantNameRefresher.TenantQuery explaining that the query intentionally selects
tenant records whose Name field equals repo.Empty (i.e., tenants with empty or
unset names) so they can be targeted for a name refresh; reference the
TenantNameRefresher.TenantQuery method and the repo.Name and repo.Empty symbols
in the comment so future maintainers understand the intent.
In `@internal/async/tasks/tenant/workflow_cleanup.go`:
- Around line 17-21: The WorkflowCleaner struct contains an unused processor
field (type *async.BatchProcessor) that is set in NewWorkflowCleaner but never
referenced in ProcessTask or elsewhere; either remove the processor field and
its initialization in NewWorkflowCleaner (and any related imports) to eliminate
dead code, or update ProcessTask to actually use the processor (e.g., submit
cleanup work to the processor) so the field is required—locate WorkflowCleaner,
NewWorkflowCleaner and ProcessTask to implement the chosen fix.
In `@internal/config/config.go`:
- Around line 128-141: Add validation in Scheduler.Validate() to ensure
Task.TimeOut and FanOutTask.TimeOut are non-negative and that when
FanOutTask.Enabled is true, FanOutTask.Retries falls within acceptable bounds
(e.g., >=0 and <= a max retry limit). Update the validation logic that currently
checks tasks to also inspect each Task.TimeOut and the nested FanOutTask
(FanOutTask.Enabled, FanOutTask.Retries, FanOutTask.TimeOut), returning a clear
validation error from Scheduler.Validate() when any of these constraints are
violated.
In `@internal/config/loader.go`:
- Around line 55-59: The current loop that populates cfg.Scheduler.taskMap from
cfg.Scheduler.Tasks silently overwrites entries with the same TaskType; update
the code that iterates over cfg.Scheduler.Tasks to check whether
cfg.Scheduler.taskMap[task.TaskType] already exists before assigning, and on
duplicate either log a warning (via the existing logger) or return an error to
abort loading; reference the cfg.Scheduler.taskMap map, the cfg.Scheduler.Tasks
slice, and the Task.TaskType field when adding this duplicate-detection logic so
duplicates are reported instead of being silently replaced.
In `@internal/config/tasks.go`:
- Around line 28-35: The Cronspec/TimeOut configuration currently sets Cronspec
to "*/5 * * * *" and TimeOut to 5 * time.Minute (and FanOutTask.TimeOut also 5 *
time.Minute), which makes the task timeout equal to the cron interval and risks
overlap; update the configuration so task TimeOut is strictly less than the cron
interval or add/verify uniqueness/locking semantics before scheduling to prevent
concurrent runs (inspect the worker uniqueness mechanism used by this task), and
replace the hard-coded Retries: 3 with the shared defaultRetryCount constant for
consistency; refer to the Cronspec, TimeOut, FanOutTask.TimeOut, and Retries
fields to locate and change these values.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c123b574-4246-4be0-9eb1-40d1b9ffa4af
📒 Files selected for processing (43)
charts/cmk/values-dev.yamlcharts/cmk/values.yamlcmd/task-worker/main.gointernal/async/async.gointernal/async/batch_processor.gointernal/async/batch_processor_test.gointernal/async/fanout.gointernal/async/fanout_test.gointernal/async/mock_client.gointernal/async/scheduler_task_config.gointernal/async/tasks/batch_processor.gointernal/async/tasks/hyok_sync.gointernal/async/tasks/keystore_pool.gointernal/async/tasks/keystore_pool_test.gointernal/async/tasks/send_notfication.gointernal/async/tasks/tenant/cert_rotation.gointernal/async/tasks/tenant/cert_rotation_test.gointernal/async/tasks/tenant/hyok_sync.gointernal/async/tasks/tenant/hyok_sync_test.gointernal/async/tasks/tenant/system_refresh.gointernal/async/tasks/tenant/system_refresh_test.gointernal/async/tasks/tenant/tenant_name_refresh.gointernal/async/tasks/tenant/tenant_name_refresh_test.gointernal/async/tasks/tenant/workflow_cleanup.gointernal/async/tasks/tenant/workflow_cleanup_test.gointernal/async/tasks/tenant/workflow_expiry.gointernal/async/tasks/tenant/workflow_expiry_test.gointernal/async/tasks/tenant_name_refresh.gointernal/async/tasks/workflow.gointernal/async/tasks/workflow_cleanup.gointernal/async/tasks/workflow_expiry.gointernal/config/config.gointernal/config/loader.gointernal/config/tasks.gointernal/log/log.gointernal/manager/key.gointernal/manager/workflow_test.gotest/integration/async_test/certrotation_test.gotest/integration/async_test/hyoksync_test.gotest/integration/async_test/keystorepool_test.gotest/integration/async_test/systeminformation_test.gotest/integration/async_test/workflowassign_test.goutils/async/async.go
💤 Files with no reviewable changes (5)
- internal/async/tasks/tenant_name_refresh.go
- internal/async/tasks/workflow_cleanup.go
- internal/async/tasks/workflow_expiry.go
- internal/async/tasks/hyok_sync.go
- internal/async/tasks/batch_processor.go
minh-nghia
left a comment
There was a problem hiding this comment.
LGTM, but some AI suggestions can be considered 😁
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/async/async.go`:
- Around line 164-175: The TenantTaskHandler branch drops handler-specific
tenant filters when fan-out is disabled: keep h.TenantQuery() passed into
NewBatchProcessor even if enabled is false. In the case handling
TenantTaskHandler (symbols: TenantTaskHandler, getFanOutOpts, NewBatchProcessor,
WithFanOutTenants, WithTenantQuery, ProcessTenantsInBatch, TenantQuery,
h.ProcessTask), always construct the processor with
WithTenantQuery(h.TenantQuery()), and only add WithFanOutTenants(a.Client(),
opts...) when enabled; then call processor.ProcessTenantsInBatch(ctx, task,
h.ProcessTask) as before.
In `@internal/async/tasks/tenant/system_refresh.go`:
- Around line 41-53: The handler currently swallows context
cancellation/deadline errors, preventing asynq from retrying; update
SystemsRefresher.ProcessTask to treat context cancellation (context.Canceled)
and deadline exceeded (context.DeadlineExceeded) as retryable errors like
connection errors. Specifically, after calling s.systemClient.UpdateSystems(ctx)
check if err is non-nil and if errors.Is(err, context.Canceled) or
errors.Is(err, context.DeadlineExceeded) (in addition to isConnectionError(err))
then return the error so asynq will retry; otherwise log the error with
log.Error and return nil only when err == nil. Ensure you reference the
ProcessTask method, UpdateSystems call, and isConnectionError helper when making
the change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 03eb7099-ac50-45d6-be17-c5dd3b1f2dc9
📒 Files selected for processing (12)
charts/cmk/values-dev.yamlcmd/task-worker/main.gointernal/async/async.gointernal/async/batch_processor.gointernal/async/errors.gointernal/async/tasks/tenant/cert_rotation.gointernal/async/tasks/tenant/hyok_sync.gointernal/async/tasks/tenant/system_refresh.gointernal/async/tasks/tenant/tenant_name_refresh.gointernal/async/tasks/tenant/workflow_cleanup.gointernal/async/tasks/tenant/workflow_expiry.gointernal/log/log.go
🚧 Files skipped from review as they are similar to previous changes (3)
- internal/async/tasks/tenant/workflow_expiry.go
- internal/async/batch_processor.go
- internal/async/tasks/tenant/hyok_sync.go
| case TenantTaskHandler: | ||
| enabled, opts := a.getFanOutOpts(h.TaskType()) | ||
| if enabled { | ||
| processor := NewBatchProcessor( | ||
| r, | ||
| WithFanOutTenants(a.Client(), opts...), | ||
| WithTenantQuery(h.TenantQuery()), | ||
| ) | ||
| return processor.ProcessTenantsInBatch(ctx, task, h.ProcessTask) | ||
| } | ||
| processor := NewBatchProcessor(r) | ||
| return processor.ProcessTenantsInBatch(ctx, task, h.ProcessTask) |
There was a problem hiding this comment.
Preserve TenantQuery() when fan-out is disabled.
NewBatchProcessor(r) falls back to repo.NewQuery(), so this branch ignores handler-specific filters. That makes selective handlers like TenantNameRefresher run against every tenant whenever fan-out is off.
Suggested fix
- processor := NewBatchProcessor(r)
+ processor := NewBatchProcessor(
+ r,
+ WithTenantQuery(h.TenantQuery()),
+ )
return processor.ProcessTenantsInBatch(ctx, task, h.ProcessTask)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| case TenantTaskHandler: | |
| enabled, opts := a.getFanOutOpts(h.TaskType()) | |
| if enabled { | |
| processor := NewBatchProcessor( | |
| r, | |
| WithFanOutTenants(a.Client(), opts...), | |
| WithTenantQuery(h.TenantQuery()), | |
| ) | |
| return processor.ProcessTenantsInBatch(ctx, task, h.ProcessTask) | |
| } | |
| processor := NewBatchProcessor(r) | |
| return processor.ProcessTenantsInBatch(ctx, task, h.ProcessTask) | |
| case TenantTaskHandler: | |
| enabled, opts := a.getFanOutOpts(h.TaskType()) | |
| if enabled { | |
| processor := NewBatchProcessor( | |
| r, | |
| WithFanOutTenants(a.Client(), opts...), | |
| WithTenantQuery(h.TenantQuery()), | |
| ) | |
| return processor.ProcessTenantsInBatch(ctx, task, h.ProcessTask) | |
| } | |
| processor := NewBatchProcessor( | |
| r, | |
| WithTenantQuery(h.TenantQuery()), | |
| ) | |
| return processor.ProcessTenantsInBatch(ctx, task, h.ProcessTask) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/async/async.go` around lines 164 - 175, The TenantTaskHandler branch
drops handler-specific tenant filters when fan-out is disabled: keep
h.TenantQuery() passed into NewBatchProcessor even if enabled is false. In the
case handling TenantTaskHandler (symbols: TenantTaskHandler, getFanOutOpts,
NewBatchProcessor, WithFanOutTenants, WithTenantQuery, ProcessTenantsInBatch,
TenantQuery, h.ProcessTask), always construct the processor with
WithTenantQuery(h.TenantQuery()), and only add WithFanOutTenants(a.Client(),
opts...) when enabled; then call processor.ProcessTenantsInBatch(ctx, task,
h.ProcessTask) as before.
| func (s *SystemsRefresher) ProcessTask(ctx context.Context, task *asynq.Task) error { | ||
| log.Info(ctx, "Starting Systems Refresh Task") | ||
|
|
||
| err := s.processor.ProcessTenantsInBatch( | ||
| ctx, | ||
| "Systems Refresh", | ||
| task, | ||
| repo.NewQuery(), | ||
| func(tenantCtx context.Context, tenant *model.Tenant, index int) error { | ||
| log.Debug(tenantCtx, "Refreshing systems for tenant", | ||
| slog.String("schemaName", tenant.SchemaName), slog.Int("index", index)) | ||
|
|
||
| updateErr := s.systemClient.UpdateSystems(ctx) | ||
| // If network error return an error triggering | ||
| // another task attempt with a backoff | ||
| if isConnectionError(updateErr) { | ||
| return updateErr | ||
| } | ||
|
|
||
| if updateErr != nil { | ||
| log.Error(ctx, "Running Refresh System Task", updateErr) | ||
| } | ||
| return nil | ||
| }) | ||
| if err != nil { | ||
| log.Error(ctx, "Error during systems refresh batch processing", err) | ||
| return errs.Wrap(ErrRunningTask, err) | ||
| err := s.systemClient.UpdateSystems(ctx) | ||
|
|
||
| // If network error return an error triggering | ||
| // another task attempt with a backoff | ||
| if isConnectionError(err) { | ||
| return err | ||
| } | ||
|
|
||
| if err != nil { | ||
| log.Error(ctx, "Running Refresh System Task", err) | ||
| } | ||
| return nil |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's find and examine the file in question
find . -type f -name "system_refresh.go" | head -20Repository: openkcm/cmk
Length of output: 102
🏁 Script executed:
# Let's also check the structure of the async tasks directory
find . -type f -path "*/internal/async/tasks/tenant/*" | head -20Repository: openkcm/cmk
Length of output: 664
🏁 Script executed:
# Search for isConnectionError definition
rg -n "func isConnectionError" --type goRepository: openkcm/cmk
Length of output: 144
🏁 Script executed:
# Search for SystemsRefresher and ProcessTask implementation
rg -n "type SystemsRefresher\|func.*ProcessTask" --type go -A 5Repository: openkcm/cmk
Length of output: 37
🏁 Script executed:
# Let's read the system_refresh.go file to see the full implementation
cat -n ./internal/async/tasks/tenant/system_refresh.goRepository: openkcm/cmk
Length of output: 2202
🏁 Script executed:
# Let's check what errors context cancellation produces
rg -n "context.Canceled\|context.DeadlineExceeded" --type go | head -20Repository: openkcm/cmk
Length of output: 37
🏁 Script executed:
# Let's check asynq documentation and behavior - search for how asynq treats return values
rg -n "return nil\|return err" ./internal/async/tasks/tenant/system_refresh.go -B 2 -A 2Repository: openkcm/cmk
Length of output: 37
🏁 Script executed:
# Let's find implementations of UpdateSystems interface
rg -n "func.*UpdateSystems" --type go -A 10 | head -50Repository: openkcm/cmk
Length of output: 3762
🏁 Script executed:
# Search for other handlers in the same directory to see how they handle errors
ls -la ./internal/async/tasks/tenant/Repository: openkcm/cmk
Length of output: 1004
🏁 Script executed:
# Let's check one of the other task handlers to see their pattern
cat -n ./internal/async/tasks/tenant/cert_rotation.go | head -80Repository: openkcm/cmk
Length of output: 2112
🏁 Script executed:
# Let's check the test file for system_refresh to understand expected behavior
cat -n ./internal/async/tasks/tenant/system_refresh_test.goRepository: openkcm/cmk
Length of output: 1302
🏁 Script executed:
# Let's look for how context errors are handled elsewhere
rg -n "context.Canceled\|context.DeadlineExceeded" ./internal/async/tasks/ --type goRepository: openkcm/cmk
Length of output: 37
🏁 Script executed:
# Check how other task handlers handle errors - let's look at hyok_sync
cat -n ./internal/async/tasks/tenant/hyok_sync.goRepository: openkcm/cmk
Length of output: 1334
🏁 Script executed:
# Check asynq documentation - look for how it handles errors in handlers
rg -n "github.com/hibiken/asynq" . --type go -A 3 | grep -E "(Error|retry|backoff)" | head -20Repository: openkcm/cmk
Length of output: 37
🏁 Script executed:
# Let's check the ProcessInBatch implementation to understand error propagation
rg -n "func.*ProcessInBatch" --type go -A 15Repository: openkcm/cmk
Length of output: 4110
🏁 Script executed:
# Let's look at how context cancellation is handled in the codebase
rg -n "defer.*cancel\|ctx.*Done\|cancel()" ./internal/manager/systeminformation.go -B 2 -A 2Repository: openkcm/cmk
Length of output: 37
🏁 Script executed:
# Check if there are any tests that verify context cancellation behavior
rg -n "context.Background\|context.WithCancel\|context.WithTimeout\|context.WithDeadline" ./internal/async/tasks/tenant/ --type goRepository: openkcm/cmk
Length of output: 37
🏁 Script executed:
# Let's search for how asynq handles errors - check the async package
cat -n ./internal/async/*.go | head -100Repository: openkcm/cmk
Length of output: 3560
🏁 Script executed:
# Let's check the ProcessInBatchWithOptions implementation more carefully
cat -n ./internal/repo/repository.go | sed -n '264,310p'Repository: openkcm/cmk
Length of output: 1319
🏁 Script executed:
# Check if there are examples of checking context.Canceled or context.DeadlineExceeded errors
rg -n "errors.Is.*context\\.Canceled\|errors.Is.*context\\.DeadlineExceeded" --type goRepository: openkcm/cmk
Length of output: 37
🏁 Script executed:
# Let's look for imports of "errors" package in the codebase to see how they handle error checking
rg -n "^import.*errors" ./internal/async/tasks/tenant/ --type goRepository: openkcm/cmk
Length of output: 37
🏁 Script executed:
# Let's check what errors UpdateSystems could return by looking at the updateSystem implementation
rg -n "func.*updateSystem" ./internal/manager/ --type go -A 20 | head -60Repository: openkcm/cmk
Length of output: 1566
🏁 Script executed:
# Let's check how the repository's List function handles context cancellation
rg -n "func.*List.*context" ./internal/repo/ --type go -A 10 | head -50Repository: openkcm/cmk
Length of output: 37
🏁 Script executed:
# Let's look at the actual test to understand what behavior is expected
cat -n ./internal/async/tasks/tenant/system_refresh_test.goRepository: openkcm/cmk
Length of output: 1302
🏁 Script executed:
# Check if there are any tests for context cancellation in other task handlers
rg -n "WithCancel\|WithTimeout\|context.Canceled\|context.DeadlineExceeded" ./internal/async/ --type goRepository: openkcm/cmk
Length of output: 37
🏁 Script executed:
# Let's check how errors from context are typically handled in the codebase
rg -n "context.Canceled\|context.DeadlineExceeded" . --type goRepository: openkcm/cmk
Length of output: 37
🏁 Script executed:
# Let's check if there are other places where context errors are checked
rg -n "errors.Is" ./internal/ --type go -A 2 | head -40Repository: openkcm/cmk
Length of output: 2258
🏁 Script executed:
# Let's check the status code for context.Canceled
rg -n "codes.Canceled" . --type goRepository: openkcm/cmk
Length of output: 37
🏁 Script executed:
# Let's verify what error codes gRPC returns - check if DeadlineExceeded in gRPC matches context.DeadlineExceeded
rg -n "codes\\.DeadlineExceeded" . --type goRepository: openkcm/cmk
Length of output: 282
🏁 Script executed:
# Let's look for any handling of context cancellation in the manager
cat -n ./internal/manager/systeminformation.go | head -100Repository: openkcm/cmk
Length of output: 3310
🏁 Script executed:
# Verify that repo.List is indeed called with the task context
rg -n "repo.List" ./internal/repo/repository.go -B 2 -A 2 | head -20Repository: openkcm/cmk
Length of output: 215
🏁 Script executed:
# Double-check asynq's expected behavior with returning errors vs nil
web_search
asynq library golang error handling return nil vs return error retryRepository: openkcm/cmk
Length of output: 149
🌐 Web query:
asynq golang error handling return nil vs return error task retry behavior
💡 Result:
In hibiken/asynq, the retry/archiving behavior is driven entirely by what your Handler.ProcessTask(...) error returns:
- Return
nil⇒ task is treated as success and won’t be retried. The handler contract is “returnnilon success, non-nilon failure.” [1] - Return a non-
nilerror (or panic) ⇒ task is treated as failed and will be retried after a delay while retry-count remains; once retries are exhausted, it’s archived. [1] - Return
asynq.SkipRetry(or an error that wraps it) ⇒ no retries; the task is archived immediately, even if retries remain. [2] - Return
asynq.RevokeTask(or wrap it) ⇒ the task is neither retried nor archived. [2]
Retry configuration knobs (server/task options):
- Max retries: default is 25 unless overridden (e.g.,
asynq.MaxRetry(n)). [3] - Retry delay: controlled by
Config.RetryDelayFunc(default is exponential backoff). [4] - “Return error but don’t consume retry count”: set
Config.IsFailure(err) bool. If it returns false, asynq won’t increment the task’s retried counter / failure stats for that error (useful for “try again later” conditions). [5]
Go gotcha that affects retries: make sure you return a true nil error interface. Returning a typed-nil (e.g., var e *MyErr = nil; return e) becomes a non-nil error interface and will trigger retries.
Sources
- Asynq
Handlerdocs (retry on non-nil/panic; archive after retries exhausted). (pkg.go.dev) SkipRetry/RevokeTasksemantics. (pkg.go.dev)- Default max retry = 25 (task processing options docs). (pkg.go.dev)
RetryDelayFuncdefinition and purpose. (pkg.go.dev)IsFailurebehavior (don’t increment retried counter when false). (pkg.go.dev)
Citations:
- 1: https://pkg.go.dev/github.com/hibiken/asynq
- 2: https://pkg.go.dev/github.com/hibiken/asynq
- 3: https://pkg.go.dev/github.com/hibiken/asynq
- 4: https://pkg.go.dev/github.com/hibiken/asynq
- 5: https://pkg.go.dev/github.com/hibiken/asynq
Handle context cancellation errors to trigger retries.
When UpdateSystems exits due to task context cancellation or deadline, the current code logs the error and returns nil, marking the task as successful. Asynq only retries when the handler returns a non-nil error; returning nil prevents the interrupted task from being retried.
Suggested fix
import (
"context"
+ "errors"
"github.com/hibiken/asynq"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@
- if isConnectionError(err) {
+ if errors.Is(err, context.Canceled) ||
+ errors.Is(err, context.DeadlineExceeded) ||
+ isConnectionError(err) {
return err
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/async/tasks/tenant/system_refresh.go` around lines 41 - 53, The
handler currently swallows context cancellation/deadline errors, preventing
asynq from retrying; update SystemsRefresher.ProcessTask to treat context
cancellation (context.Canceled) and deadline exceeded (context.DeadlineExceeded)
as retryable errors like connection errors. Specifically, after calling
s.systemClient.UpdateSystems(ctx) check if err is non-nil and if errors.Is(err,
context.Canceled) or errors.Is(err, context.DeadlineExceeded) (in addition to
isConnectionError(err)) then return the error so asynq will retry; otherwise log
the error with log.Error and return nil only when err == nil. Ensure you
reference the ProcessTask method, UpdateSystems call, and isConnectionError
helper when making the change.
e44e00a to
17ace74
Compare
ad74173 to
4058843
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
internal/async/tasks/tenant/system_refresh.go (1)
41-54:⚠️ Potential issue | 🟠 MajorContext cancellation errors are still swallowed, preventing retries.
When
UpdateSystemsfails due to task context cancellation (context.Canceled) or deadline exceeded (context.DeadlineExceeded), the current code logs the error and returnsnil, marking the task as successful. Asynq only retries when the handler returns a non-nil error. TheisConnectionErrorhelper only checks gRPC status codes, not Go context errors.Additionally, this error handling pattern is inconsistent with
cert_rotation.go, which propagates all errors viahandleErrorTenants.Suggested fix
+import ( + "context" + "errors" + + "github.com/hibiken/asynq" ... func (s *SystemsRefresher) ProcessTask(ctx context.Context, task *asynq.Task) error { err := s.systemClient.UpdateSystems(ctx) - // If network error return an error triggering - // another task attempt with a backoff - if isConnectionError(err) { + // If context cancelled, deadline exceeded, or network error, + // return error to trigger retry with backoff + if errors.Is(err, context.Canceled) || + errors.Is(err, context.DeadlineExceeded) || + isConnectionError(err) { return err } if err != nil { log.Error(ctx, "Running Refresh System Task", err) } return nil }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/async/tasks/tenant/system_refresh.go` around lines 41 - 54, The ProcessTask in SystemsRefresher currently swallows context cancellation/deadline errors; modify error handling in SystemsRefresher.ProcessTask (around the call to s.systemClient.UpdateSystems) so that context.Canceled and context.DeadlineExceeded are treated as retryable like gRPC connection errors: if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) (or extend isConnectionError to include those), return the error to trigger an asynq retry; for non-retryable failures, forward the error to the same handler used by cert_rotation.go (handleErrorTenants(ctx, err)) or log consistently, then return nil. Ensure you reference s.systemClient.UpdateSystems, isConnectionError (or new isRetryableError helper), SystemsRefresher.ProcessTask, and handleErrorTenants when making the change.
🧹 Nitpick comments (4)
internal/async/batch_processor.go (2)
76-94: Clarify the warning message when tenant IDs list is empty.Line 92's warning "Failed to parse tenant IDs from task payload" is misleading — parsing succeeded but the list was empty. This could confuse operators reviewing logs.
✏️ Suggested message clarification
if len(payload.TenantIDs) > 0 { log.Info( ctx, "Processing specified tenants", slog.Int("tenantCount", len(payload.TenantIDs)), ) tenantIDs = payload.TenantIDs } else { - log.Warn(ctx, "Failed to parse tenant IDs from task payload, processing all tenants") + log.Warn(ctx, "No tenant IDs specified in payload, processing all tenants") }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/async/batch_processor.go` around lines 76 - 94, The warning message is misleading because ParseTenantListPayload succeeded but returned an empty TenantIDs slice; update the log message in the block that inspects payload.TenantIDs (after calling asyncUtils.ParseTenantListPayload on asynqTask.Payload()) to clearly indicate that the payload parsed successfully but contained no tenant IDs (reference variables/functions: tenantIDs, payload.TenantIDs, asyncUtils.ParseTenantListPayload, asynqTask.Payload, and log.Warn) so operators know you’re defaulting to processing all tenants rather than encountering a parse error.
120-137: Note asymmetric error handling between fan-out and inline modes.In fan-out mode (line 134-136), an enqueue failure immediately returns the error and stops batch processing. In inline mode (line 122-124), a processing error is logged but the loop continues with the next tenant.
This asymmetry may be intentional — fan-out failures suggest infrastructure issues warranting a full stop, while inline failures are tenant-specific and shouldn't block others. If this is the intended behavior, a brief code comment would help future maintainers.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/async/batch_processor.go` around lines 120 - 137, The branch handling for bp.fanOut is asymmetric: processTenant(ctx, asynqTask) logs errors and continues, while FanOutTask(...) returns the error and stops processing; decide which behavior you want and implement it consistently or document the intent. Either change the FanOutTask error path to log the error and continue (matching processTenant) or change the inline path to return the error (matching FanOutTask), updating the code paths around processTenant, FanOutTask, bp.asyncClient, asyncUtils.NewTaskPayload, and bp.fanOutOpts accordingly; if the current asymmetry is intentional, add a concise comment near the bp.fanOut branch explaining why fan-out failures should abort while inline failures should not.internal/config/config.go (1)
127-141: Consider adding validation for timeout values.The new
TimeOutfields inTaskandFanOutTaskcould benefit from validation to ensure non-negative durations. Negative timeouts would cause unexpected behavior with asynq options.♻️ Optional validation in Scheduler.Validate()
func (s *Scheduler) Validate() error { checkedTasks := make(map[string]struct{}, len(s.Tasks)) for _, task := range s.Tasks { _, found := PeriodicTasks[task.TaskType] if !found { return ErrNonDefinedTaskType } _, found = checkedTasks[task.TaskType] if found { return ErrRepeatedTaskType } + if task.TimeOut < 0 { + return errors.New("task timeout must not be negative") + } + if task.FanOutTask.TimeOut < 0 { + return errors.New("fan-out task timeout must not be negative") + } + checkedTasks[task.TaskType] = struct{}{} } return nil }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/config/config.go` around lines 127 - 141, The Task and FanOutTask structs now include TimeOut fields which need validation to prevent negative durations; add checks in the configuration validation path (e.g., Scheduler.Validate or Config.Validate) to ensure Task.TimeOut and FanOutTask.TimeOut are >= 0 (and optionally non-zero if zero is disallowed), returning a descriptive error referencing the task (use the Task struct and FanOutTask struct names) when invalid; update any unmarshalling or defaults logic that sets TimeOut to enforce a safe default and include tests covering negative, zero, and positive TimeOut values.internal/async/tasks/tenant/hyok_sync.go (1)
17-20: Remove the unusedrepofield.The
repofield is stored in the struct but never used in any method. This is dead code.♻️ Proposed fix
type HYOKSync struct { hyokClient HYOKUpdater - repo repo.Repo } func NewHYOKSync( hyokClient HYOKUpdater, - repo repo.Repo, opts ...async.TaskOption, ) async.TenantTaskHandler { h := &HYOKSync{ hyokClient: hyokClient, - repo: repo, }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/async/tasks/tenant/hyok_sync.go` around lines 17 - 20, The HYOKSync struct contains an unused field repo of type repo.Repo; remove this dead field from the struct definition (identify HYOKSync and the repo field) and update any constructors or initializers that set repo (e.g., where HYOKSync is instantiated) to stop passing or storing repo.Repo so the struct only holds the needed hyokClient HYOKUpdater.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/async/fanout.go`:
- Around line 38-41: The type name FunOutFunc is a typo and should be renamed to
FanOutFunc to match related identifiers like FanOutTask and FanOutHandler;
update the type declaration (replace FunOutFunc with FanOutFunc) and then update
all references/usages across the codebase (e.g., any function signatures,
variables, method returns and handlers that currently reference FunOutFunc or
return FanOutFunc()) to use FanOutFunc so compilation and naming consistency are
restored.
---
Duplicate comments:
In `@internal/async/tasks/tenant/system_refresh.go`:
- Around line 41-54: The ProcessTask in SystemsRefresher currently swallows
context cancellation/deadline errors; modify error handling in
SystemsRefresher.ProcessTask (around the call to s.systemClient.UpdateSystems)
so that context.Canceled and context.DeadlineExceeded are treated as retryable
like gRPC connection errors: if errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) (or extend isConnectionError to include
those), return the error to trigger an asynq retry; for non-retryable failures,
forward the error to the same handler used by cert_rotation.go
(handleErrorTenants(ctx, err)) or log consistently, then return nil. Ensure you
reference s.systemClient.UpdateSystems, isConnectionError (or new
isRetryableError helper), SystemsRefresher.ProcessTask, and handleErrorTenants
when making the change.
---
Nitpick comments:
In `@internal/async/batch_processor.go`:
- Around line 76-94: The warning message is misleading because
ParseTenantListPayload succeeded but returned an empty TenantIDs slice; update
the log message in the block that inspects payload.TenantIDs (after calling
asyncUtils.ParseTenantListPayload on asynqTask.Payload()) to clearly indicate
that the payload parsed successfully but contained no tenant IDs (reference
variables/functions: tenantIDs, payload.TenantIDs,
asyncUtils.ParseTenantListPayload, asynqTask.Payload, and log.Warn) so operators
know you’re defaulting to processing all tenants rather than encountering a
parse error.
- Around line 120-137: The branch handling for bp.fanOut is asymmetric:
processTenant(ctx, asynqTask) logs errors and continues, while FanOutTask(...)
returns the error and stops processing; decide which behavior you want and
implement it consistently or document the intent. Either change the FanOutTask
error path to log the error and continue (matching processTenant) or change the
inline path to return the error (matching FanOutTask), updating the code paths
around processTenant, FanOutTask, bp.asyncClient, asyncUtils.NewTaskPayload, and
bp.fanOutOpts accordingly; if the current asymmetry is intentional, add a
concise comment near the bp.fanOut branch explaining why fan-out failures should
abort while inline failures should not.
In `@internal/async/tasks/tenant/hyok_sync.go`:
- Around line 17-20: The HYOKSync struct contains an unused field repo of type
repo.Repo; remove this dead field from the struct definition (identify HYOKSync
and the repo field) and update any constructors or initializers that set repo
(e.g., where HYOKSync is instantiated) to stop passing or storing repo.Repo so
the struct only holds the needed hyokClient HYOKUpdater.
In `@internal/config/config.go`:
- Around line 127-141: The Task and FanOutTask structs now include TimeOut
fields which need validation to prevent negative durations; add checks in the
configuration validation path (e.g., Scheduler.Validate or Config.Validate) to
ensure Task.TimeOut and FanOutTask.TimeOut are >= 0 (and optionally non-zero if
zero is disallowed), returning a descriptive error referencing the task (use the
Task struct and FanOutTask struct names) when invalid; update any unmarshalling
or defaults logic that sets TimeOut to enforce a safe default and include tests
covering negative, zero, and positive TimeOut values.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 4ad18cb7-522e-4b74-a060-b99d1bbc4e5c
📒 Files selected for processing (44)
charts/cmk/values-dev.yamlcharts/cmk/values.yamlcmd/task-worker/main.gointernal/async/async.gointernal/async/batch_processor.gointernal/async/batch_processor_test.gointernal/async/errors.gointernal/async/fanout.gointernal/async/fanout_test.gointernal/async/mock_client.gointernal/async/scheduler_task_config.gointernal/async/tasks/batch_processor.gointernal/async/tasks/hyok_sync.gointernal/async/tasks/keystore_pool.gointernal/async/tasks/keystore_pool_test.gointernal/async/tasks/send_notfication.gointernal/async/tasks/tenant/cert_rotation.gointernal/async/tasks/tenant/cert_rotation_test.gointernal/async/tasks/tenant/hyok_sync.gointernal/async/tasks/tenant/hyok_sync_test.gointernal/async/tasks/tenant/system_refresh.gointernal/async/tasks/tenant/system_refresh_test.gointernal/async/tasks/tenant/tenant_name_refresh.gointernal/async/tasks/tenant/tenant_name_refresh_test.gointernal/async/tasks/tenant/workflow_cleanup.gointernal/async/tasks/tenant/workflow_cleanup_test.gointernal/async/tasks/tenant/workflow_expiry.gointernal/async/tasks/tenant/workflow_expiry_test.gointernal/async/tasks/tenant_name_refresh.gointernal/async/tasks/workflow.gointernal/async/tasks/workflow_cleanup.gointernal/async/tasks/workflow_expiry.gointernal/config/config.gointernal/config/loader.gointernal/config/tasks.gointernal/log/log.gointernal/manager/key.gointernal/manager/workflow_test.gotest/integration/async_test/certrotation_test.gotest/integration/async_test/hyoksync_test.gotest/integration/async_test/keystorepool_test.gotest/integration/async_test/systeminformation_test.gotest/integration/async_test/workflowassign_test.goutils/async/async.go
💤 Files with no reviewable changes (5)
- internal/async/tasks/batch_processor.go
- internal/async/tasks/tenant_name_refresh.go
- internal/async/tasks/hyok_sync.go
- internal/async/tasks/workflow_cleanup.go
- internal/async/tasks/workflow_expiry.go
✅ Files skipped from review due to trivial changes (10)
- internal/async/tasks/keystore_pool_test.go
- internal/async/errors.go
- charts/cmk/values.yaml
- test/integration/async_test/keystorepool_test.go
- internal/manager/key.go
- utils/async/async.go
- internal/async/fanout_test.go
- test/integration/async_test/systeminformation_test.go
- internal/async/batch_processor_test.go
- test/integration/async_test/hyoksync_test.go
🚧 Files skipped from review as they are similar to previous changes (18)
- internal/manager/workflow_test.go
- internal/async/tasks/workflow.go
- internal/async/tasks/tenant/workflow_expiry_test.go
- internal/async/scheduler_task_config.go
- test/integration/async_test/certrotation_test.go
- internal/config/loader.go
- charts/cmk/values-dev.yaml
- internal/log/log.go
- internal/async/tasks/tenant/workflow_cleanup_test.go
- internal/async/tasks/tenant/hyok_sync_test.go
- internal/async/tasks/tenant/tenant_name_refresh_test.go
- internal/async/tasks/send_notfication.go
- test/integration/async_test/workflowassign_test.go
- internal/config/tasks.go
- internal/async/tasks/keystore_pool.go
- internal/async/tasks/tenant/workflow_expiry.go
- internal/async/tasks/tenant/workflow_cleanup.go
- internal/async/async.go
| type ( | ||
| ProcessFunc func(ctx context.Context, task *asynq.Task) error | ||
| FunOutFunc func(ctx context.Context, task *asynq.Task, f ProcessFunc) error | ||
| ) |
There was a problem hiding this comment.
Typo in type alias name: FunOutFunc should be FanOutFunc.
The type alias FunOutFunc appears to be a typo — it should likely be FanOutFunc to match the naming convention used elsewhere (e.g., FanOutTask, FanOutHandler).
✏️ Proposed fix
type (
ProcessFunc func(ctx context.Context, task *asynq.Task) error
- FunOutFunc func(ctx context.Context, task *asynq.Task, f ProcessFunc) error
+ FanOutFunc func(ctx context.Context, task *asynq.Task, f ProcessFunc) error
)Note: This will require updating references in other files (e.g., FanOutFunc() method returns in task handlers).
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| type ( | |
| ProcessFunc func(ctx context.Context, task *asynq.Task) error | |
| FunOutFunc func(ctx context.Context, task *asynq.Task, f ProcessFunc) error | |
| ) | |
| type ( | |
| ProcessFunc func(ctx context.Context, task *asynq.Task) error | |
| FanOutFunc func(ctx context.Context, task *asynq.Task, f ProcessFunc) error | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/async/fanout.go` around lines 38 - 41, The type name FunOutFunc is a
typo and should be renamed to FanOutFunc to match related identifiers like
FanOutTask and FanOutHandler; update the type declaration (replace FunOutFunc
with FanOutFunc) and then update all references/usages across the codebase
(e.g., any function signatures, variables, method returns and handlers that
currently reference FunOutFunc or return FanOutFunc()) to use FanOutFunc so
compilation and naming consistency are restored.
4058843 to
e1b3f34
Compare
e1b3f34 to
823be54
Compare
Summary by CodeRabbit
New Features
Bug Fixes & Improvements