Skip to content

Multithreading queue#108

Open
dahn510 wants to merge 60 commits intomainfrom
multi-queue
Open

Multithreading queue#108
dahn510 wants to merge 60 commits intomainfrom
multi-queue

Conversation

@dahn510
Copy link
Copy Markdown
Member

@dahn510 dahn510 commented Apr 10, 2025

Users can add tx messages to the Queue msg pool.
At every interval, messages in the pool is distributed to free workers.

Summary by CodeRabbit

  • New Features

    • Introduced a high-performance worker pool system for concurrent transaction processing with on-chain authorization management.
    • Added support for a "test mode" flag to run the provider without connecting to the chain network.
    • Enhanced queue configuration options, allowing fine-tuning of batch size, retry attempts, and concurrency.
    • Added creation of offset wallets for improved concurrency in queue and stray manager components.
  • Improvements

    • Refactored queue and proof handling for better scalability, reliability, and concurrency safety.
    • Streamlined configuration management with a new structured queue configuration section.
    • Centralized use of query clients across components for consistent blockchain querying.
    • Enhanced logging with detailed context in proof generation and transaction broadcasting.
    • Upgraded dependencies and Go toolchain for improved performance and compatibility.
    • Simplified file download and stray manager workflows with improved query client usage.
    • Updated server startup to run asynchronously and improved error logging.
    • Improved concurrency control with atomic operations in proof handling.
    • Replaced legacy queue implementation with a new interface and worker-based pool design.
    • Unified method signatures to consistently accept external query clients for blockchain queries.
  • Bug Fixes

    • Improved error handling and concurrency safety in transaction and proof processing.
    • Fixed deferred resource closing to properly log errors.
  • Tests

    • Added comprehensive unit and benchmark tests for the new worker pool and queue system.
    • Introduced extensive test utilities and mocks for robust testing scenarios.
  • Chores

    • Updated and reorganized codebase for maintainability, including removal of legacy queue implementation.

@dahn510 dahn510 self-assigned this Apr 10, 2025
Copy link
Copy Markdown
Contributor

@0foxes 0foxes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new code looks good, but i think you need to change some existing code which breaks from your changes to queue/queue.go

when i try to build it i get
image

@dahn510 dahn510 marked this pull request as ready for review June 4, 2025 01:35
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Jun 6, 2025

Walkthrough

This update restructures queue management and query client usage throughout the codebase. It introduces a worker pool-based queue system with batching and retry logic, centralizes the use of a QueryClient interface, updates method signatures to inject query clients, and adds extensive test infrastructure with mocks and fakes. Configuration for queue processing is expanded, concurrency management is improved, and server startup is made asynchronous with error handling.

Changes

File(s) Change Summary
queue/pool.go, queue/worker.go, queue/pool_test.go, queue/pool_bench_test.go, queue/worker_test.go Introduced a new worker pool-based queue system, including worker management, batching, retry logic, and comprehensive tests/benchmarks for pool and worker behaviors.
queue/queue.go, queue/types.go Removed the old queue implementation, replaced with a Queue interface and a new error variable for max retry attempts.
core/app.go, api/server.go, api/client.go, api/file_handler.go, network/downloads.go, proofs/proofs.go, proofs/types.go, strays/hands.go, strays/manager.go, strays/types.go Refactored to inject and use a QueryClient interface throughout, updated method signatures and struct fields, improved concurrency and logging, centralized query client usage, and updated wallet offset handling.
config/types.go Replaced single queue interval config with a QueueConfig struct, added defaults, updated logging and serialization, and changed key serialization method.
wallet/wallet.go Added CreateOffsetWallets to generate multiple offset wallets for worker pool usage.
cmd/start.go Added a test_mode flag and support for test mode in app initialization.
go.mod Upgraded Go version and refreshed many dependencies.
testutil/fake.go, testutil/txBuilder.go, testutil/mocks/expected_mocks.go, testutil/mocks/mocks.go, testutil/mocks/setup.go Added comprehensive fake and mock implementations for query, tx, and RPC clients, as well as test utilities and setup helpers, to support testing without live network dependencies.
queue/monitoring.go Suppressed unused variable warning for queueSize.
file_system/file_system.go Improved error handling and logging on resource closing in file data retrieval.
.gitignore Added patterns to ignore backup files (~) and macOS .DS_Store files.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant API
    participant Pool
    participant Worker
    participant Wallet
    participant QueryClient

    Client->>API: HTTP Request (e.g., PostFile)
    API->>QueryClient: Query file/provider info
    API->>Pool: Add message (e.g., MsgPostProof)
    Pool->>Worker: Dispatch message
    Worker->>Wallet: BroadcastTxCommit (batched)
    Wallet->>QueryClient: (if needed, e.g., for account info)
    Worker->>Pool: Report result
    Pool->>API: Return result
    API->>Client: HTTP Response
Loading

Possibly related PRs

  • Fixing hand work & adding gateways #125: Related changes in strays/hands.go and strays/manager.go focusing on queue integration and hand management, though this PR further introduces QueryClient injection and removes wallet cloning and authorization logic.

Poem

In the warren, queues were slow,
So rabbits built a pool to go!
Workers hop and batches fly,
QueryClients now unify.
Offset wallets multiply,
Tests and mocks—oh my, oh my!
With every hop, the code runs spry. 🐇✨


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 28579bc and 9130089.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (2)
  • go.mod (10 hunks)
  • network/downloads.go (2 hunks)
✅ Files skipped from review due to trivial changes (1)
  • go.mod
🚧 Files skipped from review as they are similar to previous changes (1)
  • network/downloads.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: lint
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 13

🔭 Outside diff range comments (4)
api/server.go (1)

66-67: ⚠️ Potential issue

Fix undefined variable reference.

The code still references myIp variable on lines 66-67, but this parameter was removed from the Serve method signature. This will cause a compilation error.

-		outline.RegisterGetRoute(r, "/get/{merkle}/{path:.*}", FindFileHandler(f, wallet, myIp))
-		outline.RegisterGetRoute(r, "/get/{merkle}", FindFileHandler(f, wallet, myIp))
+		outline.RegisterGetRoute(r, "/get/{merkle}/{path:.*}", FindFileHandler(f, wallet, "")) // TODO: Pass appropriate IP/URL
+		outline.RegisterGetRoute(r, "/get/{merkle}", FindFileHandler(f, wallet, "")) // TODO: Pass appropriate IP/URL
🧰 Tools
🪛 golangci-lint (1.64.8)

66-66: undefined: myIp

(typecheck)


67-67: undefined: myIp

(typecheck)

strays/hands.go (1)

84-84: 💡 Verification agent

🧩 Analysis chain

Fix undefined variable reference.

The code references variable q which no longer exists after the method signature change. The parameter was renamed to queryClient.

-		m, wg := q.Add(msg)
+		m, wg := queryClient.Add(msg)

Wait, this doesn't look right either. Looking at the context, this should probably be referencing a queue instance that needs to be passed to this method or accessible through the Hand struct.

Let me verify how the queue is supposed to be accessed in this new architecture:


🏁 Script executed:

#!/bin/bash
# Description: Check how queue.Add is used in the new architecture and what interfaces provide Add method

# Search for Add method implementations in queue-related files
ast-grep --pattern 'func ($$$) Add($$$) ($$$) {
  $$$
}'

# Search for queue.Add usage patterns
rg -A 3 -B 3 '\.Add\('

Length of output: 6274


🏁 Script executed:

#!/bin/bash
# Inspect the Handler struct definition in strays/hands.go
echo "=== Handler struct ==="
rg -n "type Handler" -A 5 -B 2 strays/hands.go

# Inspect the function containing q.Add in strays/hands.go
echo "=== Function with q.Add ==="
rg -n "q.Add" -A 10 -B 10 strays/hands.go

Length of output: 655


🏁 Script executed:

#!/bin/bash
# Display file header (imports and struct definitions) to locate how the queue client is provided
echo "=== strays/hands.go: file header ==="
sed -n '1,60p' strays/hands.go

# Show full function containing the q.Add call, including its signature
echo "=== strays/hands.go: function around q.Add ==="
awk 'NR>=1 && NR<=200 { print NR ":" $0 }' strays/hands.go | sed -n '1,150p'

Length of output: 4844


Fix undefined q reference in Hand.Start

The Start method calls q.Add(msg), but q isn’t declared or passed in. You need to surface a queue.Queue instance to the Hand and use it in Start. For example:

• In strays/hands.go, add a q queue.Queue field to the Hand struct.
• In StrayManager.NewHand, initialize that field:

- h := &Hand{
-     offset: offset,
-     wallet: w,
-     stray:  nil,
- }
+ h := &Hand{
+     q:      q,
+     offset: offset,
+     wallet: w,
+     stray:  nil,
+ }

• In Hand.Start, replace

m, wg := q.Add(msg)

with

m, wg := h.q.Add(msg)

Alternatively, you could add q queue.Queue as a parameter to Start, but you must ensure it’s in scope before calling Add.

🧰 Tools
🪛 golangci-lint (1.64.8)

84-84: undefined: q

(typecheck)

strays/manager.go (1)

82-84: 🛠️ Refactor suggestion

Queue wait-group for grantMsg ignored

q.Add(grantMsg) returns its own *Message/*sync.WaitGroup, but the code never waits on it.
If fee-grant creation fails the error will be lost.

Consider collecting the returned Message and waiting on its WG similarly to the claimer message.

config/types.go (1)

199-210: ⚠️ Potential issue

Inconsistent Viper keys – will ignore defaults

viper.SetDefault("QueueConfig", …) uses QueueConfig (capital Q) while YAML/mapstructure tags use queue_config.
Viper treats keys case-insensitively only when all are lower-cased; mixed casing can lead to surprises.

Align the default key to the tag:

-	viper.SetDefault("QueueConfig", DefaultQueueConfig())
+	viper.SetDefault("queue_config", DefaultQueueConfig())
♻️ Duplicate comments (1)
queue/types.go (1)

4-10: Ensure batching requirements are addressed in the new worker system.

The previous comment from TheMarstonConnell mentioned batching 100 messages and posting 45 at a time. With the Queue struct removed and replaced by the worker pool system, verify that the new implementation addresses these batching performance requirements.

#!/bin/bash
# Search for batch configuration and implementation in the new queue system
rg -A 5 -B 5 "batch|Batch" queue/
rg -A 5 "TxBatchSize|QueueThreads" .
🧹 Nitpick comments (11)
wallet/wallet.go (1)

14-15: Improve function documentation.

The comment should be more descriptive about the purpose and behavior of the function.

-// creates count wallets from main wallet
+// CreateOffsetWallets creates a specified number of offset wallets derived from the main wallet.
+// Each wallet is cloned with an incremental byte offset starting from 1.
queue/types.go (1)

10-10: Fix grammar in error message.

The error message has a grammatical issue. Consider improving it for better clarity.

-var ErrReachedMaxRetry = errors.New("send retry attempt reached max retry")
+var ErrReachedMaxRetry = errors.New("maximum retry attempts reached")
testutil/mocks/setup.go (1)

22-27: Consider simplifying SetupGRPCConn implementation.

The intermediate grpc variable assignment is unnecessary and can be simplified for consistency with other setup functions.

 func SetupGRPCConn(t gomock.TestReporter) *MockGRPCConn {
 	ctrl := gomock.NewController(t)
-	grpc := NewMockGRPCConn(ctrl)
-
-	return grpc
+	return NewMockGRPCConn(ctrl)
 }
queue/worker.go (2)

42-73: Consider timer cleanup to prevent resource leaks.

The timer management in the main loop has a potential issue: when the timer fires (case <-timer.C:), it calls timer.Stop() but doesn't properly drain the channel. This could lead to resource leaks in high-frequency scenarios.

Consider this improvement:

		case <-timer.C:
			if len(w.batch) > 0 {
				w.send()
			}
-			timer.Stop()
+			if !timer.Stop() {
+				<-timer.C
+			}

84-84: Address the TODO comment for dependency injection.

The TODO comment indicates that the send method should use dependency injection. This aligns with the broader architectural changes in the PR.

Would you like me to help implement dependency injection for the send method or open an issue to track this improvement?

strays/manager.go (2)

25-33: Duration converted but later multiplied again – clarify intent

interval / refreshInterval are cast straight to time.Duration (=> interpreted as nanoseconds) and later multiplied by time.Second.
The code works numerically but is hard to reason about and fragile if someone removes the extra multiplication.

-		interval:        time.Duration(interval),
+		interval:        time.Duration(interval) * time.Second,
 ...
-		refreshInterval: time.Duration(refreshInterval),
+		refreshInterval: time.Duration(refreshInterval) * time.Second,

(or drop the later time.Second * multipliers).


95-101: Redundant parameter – queryClient already lives in the struct

Start receives a queryClient but immediately passes it through instead of using the instance saved in the struct.
Prefer one source of truth to avoid accidental divergence.

Options:

  1. Remove the parameter and use s.queryClient.
  2. Keep the parameter but don’t embed it in the struct.
queue/worker_test.go (2)

172-176: Type-mismatch in equality assertion

expectedSignedTx is []byte while actual is ttypes.Tx (a distinct named type).
require.EqualValues may treat them as unequal on some Go versions.
Convert explicitly to the same type:

r.EqualValues(expectedSignedTx, []byte(actual))

98-103: Unused wait-group workerRunning

workerRunning is declared but never waited on in TestWorkerSendMaxRetry, so goroutines may outlive the test scope.
Either wait on it or remove it.

config/types.go (1)

143-153: Doc-comment says “seconds” but fields are int8/64 without units

The struct embeds raw numbers; indicate unit in the field name or use time.Duration to avoid ambiguity.

Example:

TxTimerSeconds int16 `yaml:"tx_timer" mapstructure:"tx_timer"`
testutil/fake.go (1)

386-416: Consider improving the fake BroadcastTxCommit implementation.

The current implementation logs messages but returns an empty result. Consider:

  1. Simulating transaction hash generation
  2. Incrementing block height
  3. Returning appropriate response codes

This would make tests more realistic and help catch integration issues.

 func (m *FakeRPCClient) BroadcastTxCommit(arg0 context.Context, arg1 types1.Tx) (*coretypes.ResultBroadcastTxCommit, error) {
 
 	res := new(coretypes.ResultBroadcastTxCommit)
+	// Simulate transaction hash
+	res.Hash = bytes.HexBytes("fake_tx_hash_" + string(m.BlockHeight))
+	res.Height = m.BlockHeight
+	m.BlockHeight++
+	res.DeliverTx = abci.ResponseDeliverTx{Code: 0}
+	res.CheckTx = abci.ResponseCheckTx{Code: 0}
 
 	tx, err := GetTxDecoder()(arg1)
 	if err != nil {
 		panic(err)
 	}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1d97b4a and 52264ba.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (27)
  • api/client.go (1 hunks)
  • api/file_handler.go (4 hunks)
  • api/server.go (3 hunks)
  • cmd/start.go (2 hunks)
  • config/types.go (5 hunks)
  • core/app.go (8 hunks)
  • go.mod (9 hunks)
  • network/downloads.go (1 hunks)
  • proofs/proofs.go (8 hunks)
  • proofs/types.go (2 hunks)
  • queue/monitoring.go (1 hunks)
  • queue/pool.go (1 hunks)
  • queue/pool_bench_test.go (1 hunks)
  • queue/pool_test.go (1 hunks)
  • queue/queue.go (1 hunks)
  • queue/types.go (1 hunks)
  • queue/worker.go (1 hunks)
  • queue/worker_test.go (1 hunks)
  • strays/hands.go (3 hunks)
  • strays/manager.go (7 hunks)
  • strays/types.go (1 hunks)
  • testutil/fake.go (1 hunks)
  • testutil/mocks/expected_mocks.go (1 hunks)
  • testutil/mocks/mocks.go (1 hunks)
  • testutil/mocks/setup.go (1 hunks)
  • testutil/txBuilder.go (1 hunks)
  • wallet/wallet.go (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (11)
queue/queue.go (1)
queue/types.go (1)
  • Message (12-18)
cmd/start.go (1)
core/app.go (3)
  • Option (54-54)
  • WithTestMode (56-63)
  • NewApp (68-139)
queue/pool_test.go (2)
queue/types.go (1)
  • Message (12-18)
queue/pool.go (1)
  • Pool (24-29)
testutil/mocks/setup.go (2)
testutil/mocks/mocks.go (9)
  • MockAuthQueryClient (29-33)
  • NewMockAuthQueryClient (41-45)
  • MockServiceClient (133-137)
  • NewMockServiceClient (145-149)
  • MockRPCClient (257-261)
  • NewMockRPCClient (269-273)
  • MockGRPCConn (870-874)
  • NewMockGRPCConn (882-886)
  • NewMockStorageQueryClient (945-949)
testutil/mocks/expected_mocks.go (1)
  • StorageQueryClient (28-30)
strays/hands.go (5)
strays/types.go (2)
  • Hand (11-16)
  • StrayManager (18-30)
file_system/types.go (1)
  • FileSystem (17-22)
network/downloads.go (1)
  • DownloadFile (27-69)
utils/ipfsParams.go (1)
  • GetIPFSParams (11-39)
queue/queue.go (1)
  • Queue (16-20)
queue/pool_bench_test.go (3)
config/types.go (1)
  • QueueConfig (143-153)
queue/pool.go (1)
  • Pool (24-29)
testutil/mocks/expected_mocks.go (1)
  • RPCClient (20-22)
queue/worker.go (1)
queue/types.go (2)
  • Message (12-18)
  • ErrReachedMaxRetry (10-10)
queue/worker_test.go (5)
testutil/mocks/mocks.go (3)
  • MockAuthQueryClient (29-33)
  • MockServiceClient (133-137)
  • MockRPCClient (257-261)
config/types.go (2)
  • ChainConfig (17-23)
  • Seed (11-14)
wallet/wallet.go (1)
  • CreateWallet (27-50)
testutil/mocks/setup.go (3)
  • SetupAuthClient (7-10)
  • SetupServiceClient (12-15)
  • SetupRPCClient (17-20)
queue/types.go (2)
  • Message (12-18)
  • ErrReachedMaxRetry (10-10)
proofs/proofs.go (3)
queue/queue.go (1)
  • Queue (16-20)
proofs/types.go (2)
  • FileSystem (28-32)
  • Prover (15-26)
file_system/types.go (1)
  • FileSystem (17-22)
strays/manager.go (2)
queue/queue.go (1)
  • Queue (16-20)
strays/types.go (1)
  • StrayManager (18-30)
queue/pool.go (3)
queue/queue.go (1)
  • Queue (16-20)
queue/types.go (1)
  • Message (12-18)
config/types.go (1)
  • QueueConfig (143-153)
🪛 GitHub Check: Test
queue/queue.go

[failure] 11-11:
"github.com/jackalLabs/canine-chain/v4/x/storage/types" imported as storageTypes and not used


[failure] 8-8:
"strings" imported and not used


[failure] 7-7:
"fmt" imported and not used


[failure] 6-6:
"errors" imported and not used


[failure] 5-5:
"bytes" imported and not used

🪛 GitHub Check: Build
queue/queue.go

[failure] 11-11:
"github.com/jackalLabs/canine-chain/v4/x/storage/types" imported as storageTypes and not used


[failure] 8-8:
"strings" imported and not used


[failure] 7-7:
"fmt" imported and not used


[failure] 6-6:
"errors" imported and not used


[failure] 5-5:
"bytes" imported and not used

🪛 GitHub Check: cov
queue/queue.go

[failure] 11-11:
"github.com/jackalLabs/canine-chain/v4/x/storage/types" imported as storageTypes and not used


[failure] 8-8:
"strings" imported and not used


[failure] 7-7:
"fmt" imported and not used


[failure] 6-6:
"errors" imported and not used


[failure] 5-5:
"bytes" imported and not used

🪛 GitHub Actions: Test Coverage
queue/queue.go

[error] 5-5: "bytes" imported and not used

🪛 GitHub Actions: Unit Test
queue/queue.go

[error] 5-5: "bytes" imported and not used

🪛 GitHub Actions: Build
queue/queue.go

[error] 5-5: "bytes" imported and not used

⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: lint
🔇 Additional comments (39)
strays/types.go (1)

29-29: Good use of dependency injection.

Adding the queryClient field to the StrayManager struct follows clean architecture principles and improves testability by accepting the interface rather than creating clients internally.

wallet/wallet.go (2)

17-17: Verify the intentional offset calculation.

The offset starts from byte(i + 1) rather than byte(i), meaning the first wallet has offset 1, not 0. Confirm this is intentional behavior for your wallet derivation logic.


16-16:

Details

✅ Verification successful

Verify Go version compatibility for range syntax.

The for i := range count syntax was introduced in Go 1.22. Ensure your project's go.mod file specifies a compatible Go version.


🏁 Script executed:

#!/bin/bash
# Check Go version requirement in go.mod
cat go.mod | grep "^go "

Length of output: 36


Go version compatibility confirmed
The for i := range count syntax is supported since Go 1.22, and your go.mod specifies Go 1.23.5—no changes required.

api/client.go (1)

14-14: LGTM! Clean dependency injection refactor.

The function signature change to accept an external QueryClient improves testability and follows dependency injection best practices. This aligns well with the broader refactor mentioned in the AI summary.

cmd/start.go (2)

48-52: LGTM! Proper flag handling implementation.

The test mode flag implementation is well-structured. Using cmd.Flags().Changed("test_mode") correctly distinguishes between default false and explicitly set false values.


70-70: LGTM! Clear flag definition.

The flag definition is clear and descriptive for enabling test mode without chain network dependency.

queue/queue.go (1)

16-20: LGTM! Clean interface abstraction.

The Queue interface provides a clean abstraction for queue behavior with appropriate method signatures for the new pool-based implementation.

api/server.go (2)

51-51: Good refactoring to inject queryClient dependency.

The change from accepting myIp to queryClient aligns well with the dependency injection pattern described in the PR objectives. This improves testability and consistency across the codebase.


59-60:

Details

✅ Verification successful

Verify handler signature compatibility.

Ensure that the updated handler calls with the queryClient parameter match the expected function signatures in their respective implementations.

Also applies to: 73-73


🏁 Script executed:

#!/bin/bash
# Verify that PostFileHandler, PostFileHandlerV2, and SpaceHandler accept queryClient parameter
ast-grep --pattern 'func PostFileHandler($$$, queryClient $_, $$$) $$$'
ast-grep --pattern 'func PostFileHandlerV2($$$, queryClient $_, $$$) $$$'
ast-grep --pattern 'func SpaceHandler($$$, queryClient $_, $$$) $$$'

Length of output: 220


Here’s a more targeted check to locate each handler’s signature and confirm the presence of queryClient:


🏁 Script executed:

#!/bin/bash
# Search for PostFileHandler, PostFileHandlerV2, and SpaceHandler definitions with context
rg -n -C2 "func PostFileHandler" .
rg -n -C2 "func PostFileHandlerV2" .
rg -n -C2 "func SpaceHandler" .

Length of output: 1793


Verified: Handler signatures include QueryClient correctly

All three handlers accept a storageTypes.QueryClient parameter in the expected position and with the correct type—no changes needed.

• api/file_handler.go:
– PostFileHandler(fio, prover, wl, query storageTypes.QueryClient, chunkSize)
– PostFileHandlerV2(fio, prover, wl, queryClient storageTypes.QueryClient, chunkSize)
• api/client.go:
– SpaceHandler(c, queryClient storageTypes.QueryClient, address)

testutil/txBuilder.go (1)

1-26: Well-structured test utilities for transaction handling.

The implementation is clean and follows good practices:

  • Clear separation of concerns between decoder creation and config setup
  • Proper registration of all necessary module interfaces
  • Consistent naming conventions and appropriate dependencies

This will provide solid foundation for testing transaction encoding/decoding across the application.

testutil/mocks/setup.go (1)

1-33: Well-designed mock setup utilities.

The setup functions provide a clean abstraction for creating mock clients with consistent patterns. Good use of the gomock framework and proper encapsulation of mock creation logic.

strays/hands.go (3)

25-25: LGTM! Method signature updated for dependency injection.

The change from queue.Queue to types.QueryClient parameter aligns with the broader refactor to inject query clients instead of creating them internally.


43-43: LGTM! Function call updated correctly.

The network.DownloadFile call has been correctly updated to pass the new queryClient parameter.


118-118: Parameter type changed from pointer to value.

The parameter type changed from *queue.Queue to queue.Queue, consistent with the broader refactoring where queue pointers were replaced by value types.

go.mod (1)

10-34: LGTM! Core dependencies updated appropriately.

The dependency updates for core libraries like cosmos/gogoproto, prometheus/client_golang, spf13/viper, and google.golang.org/grpc support the architectural changes described in the PR.

testutil/mocks/expected_mocks.go (1)

1-31: LGTM! Well-designed interface aliases for testing.

The interface aliases provide clean abstractions for mocking key client types (auth, transaction service, RPC, gRPC, storage). This supports the dependency injection refactor and follows Go testing best practices.

queue/pool_bench_test.go (3)

23-29: LGTM! Reasonable benchmark configuration.

The queue configuration values provide a good balance for benchmarking - 4 threads, batch size of 45, and reasonable retry/timer settings.


31-46: LGTM! Well-structured pool setup helper.

The setupNewPool function properly creates worker wallets and initializes the pool with appropriate channels and workers.


49-83: LGTM! Comprehensive benchmark implementation.

The benchmark follows Go conventions:

  • Proper mock setup for external dependencies
  • Correct use of b.ResetTimer() to exclude setup time
  • Clean resource management with defer pool.Stop()
  • Realistic test scenario with MsgPostProof
proofs/types.go (3)

22-24: LGTM! Thread safety improvements implemented correctly.

The changes to use int32 for threads and atomic.Int32 for currentThreads provide proper thread-safe operations. The atomic type ensures safe concurrent access to the thread count.


25-25: LGTM! Query client dependency injection added.

Adding the query field enables proper dependency injection of the query client, which aligns with the architectural changes described in the PR objectives.


34-44: LGTM! Atomic operations implemented correctly.

The Inc(), Dec(), and Full() methods properly use atomic operations for thread-safe access to currentThreads. The implementation correctly uses Add(1), Add(-1), and Load() methods.

api/file_handler.go (2)

51-51: LGTM! Dependency injection pattern implemented correctly.

The function signature change to accept query storageTypes.QueryClient parameter enables proper dependency injection and centralizes query client management.


156-156: LGTM! Consistent dependency injection pattern.

The PostFileHandlerV2 function follows the same dependency injection pattern as PostFileHandler, maintaining consistency across the API handlers.

queue/worker.go (2)

19-28: LGTM! Worker struct design is well-structured.

The worker struct properly encapsulates all necessary components for batch processing, including wallet, channels, timing, and synchronization primitives.


121-125: LGTM! Proper message completion handling.

The worker correctly sets the response and error on each message and signals completion via the WaitGroup, ensuring proper synchronization with calling code.

proofs/proofs.go (4)

38-38: LGTM! Enhanced debug logging improves observability.

The additional debug logging statements provide better visibility into the proof generation process, which will aid in debugging and monitoring.

Also applies to: 62-62


99-99: LGTM! Query client dependency injection implemented correctly.

The code now uses the injected p.query client instead of creating new clients internally, which aligns with the architectural changes and improves testability.

Also applies to: 124-124


390-405: LGTM! Constructor properly implements dependency injection.

The NewProver function correctly accepts the query client parameter and initializes all fields appropriately. The type conversion from int16 to int32 for threads aligns with the atomic operations changes in types.go.


297-297: LGTM! Comprehensive logging improvements enhance observability.

The enhanced logging provides valuable insights into the proof cycle timing, file processing, and error conditions. The structured logging with hex encoding for merkle hashes and time differences will be helpful for debugging and monitoring.

Also applies to: 314-314, 320-320, 323-323

strays/manager.go (1)

3-9: slices package requires Go ≥ 1.21

Importing the std-lib slices package ties the module to Go 1.21+.
If the project’s go directive is lower, builds will break. Verify go.mod accordingly.

queue/worker_test.go (1)

153-158: “Success” path returns a gRPC error with code OK

status.Error(codes.OK, "no error") creates a non-nil error value; many callers treat any non-nil error as failure.
Return nil for the error and provide an actual TxResponse.

-			return nil, status.Error(codes.OK, "no error")
+			return &sdkTypes.TxResponse{Code: 0}, nil
core/app.go (2)

19-19: Verify the testutil import is used correctly.

The testutil package is imported but only used within the WithTestMode function. Ensure this import is necessary and properly utilized.


276-277: LGTM! Clean separation of wallet allocation.

The code correctly splits the offset wallets between the queue pool and stray manager using slice operations.

testutil/fake.go (1)

391-391:

Details

❌ Incorrect review comment

Missing function definition for GetTxDecoder.

The GetTxDecoder() function is called but not defined or imported. This will cause a compilation error.


🏁 Script executed:

#!/bin/bash
# Search for GetTxDecoder definition in the codebase
rg -A 5 "func GetTxDecoder"

# Also check if it's exported from another package
ast-grep --pattern 'GetTxDecoder() $_'

Length of output: 362


GetTxDecoder is already defined in testutil/txBuilder.go
The function GetTxDecoder() is implemented in testutil/txBuilder.go (returns sdk.TxDecoder) and lives in the same package as fake.go, so the call on line 391 of testutil/fake.go is valid. No missing definition—this comment can be dismissed.

Likely an incorrect or invalid review comment.

queue/pool.go (3)

38-46: LGTM! Good defensive programming with panic checks.

The validation ensures workers and channels are properly initialized and matching in size.


87-100: Efficient use of reflect.Select for channel multiplexing.

The implementation correctly uses reflect.Select to send messages to any available worker without blocking. This is a good pattern for load distribution.


177-183:

Details

❌ Incorrect review comment

Remove unused function.

The newOffsetWallet function is defined but never used in this file. Consider removing it to reduce code clutter.


🏁 Script executed:

#!/bin/bash
# Check if newOffsetWallet is used anywhere in the codebase
rg "newOffsetWallet" --type go

Length of output: 184


Function newOffsetWallet Is Used in Benchmark Tests; No Removal Needed

The newOffsetWallet helper is referenced in queue/pool_bench_test.go, so it isn’t unused. If its sole purpose is for benchmarking, consider moving its definition into the test file for clarity.

• Usage found in:

  • queue/pool_bench_test.go: workerWallet := newOffsetWallet(w, int(i))

Likely an incorrect or invalid review comment.

testutil/mocks/mocks.go (1)

1-1555: Skip review for auto-generated mock code.

This file is fully generated by MockGen (mockgen -source=expected_mocks.go) and is not intended for manual edits. No manual review is required.

Help: "The number of messages currently in the queue",
})

var _ = queueSize
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider integrating the monitoring metric properly instead of using a temporary workaround.

The var _ = queueSize assignment is a common Go idiom to silence unused variable warnings, but it suggests the monitoring infrastructure isn't fully integrated yet. Consider connecting this metric to the new queue pool system to provide actual monitoring capabilities.

🤖 Prompt for AI Agents
In queue/monitoring.go at line 13, the variable queueSize is currently assigned
to the blank identifier to avoid unused variable warnings, indicating incomplete
integration. Replace this workaround by properly connecting the queueSize metric
to the new queue pool system, ensuring it actively tracks and reports queue
sizes as intended for effective monitoring.

Comment on lines +12 to +48
func TestPoolSendToAny(t *testing.T) {
r := require.New(t)
queue := make([]chan *Message, 0)

for range 4 {
queue = append(queue, make(chan *Message))
}

pool := Pool{
workerChannels: queue,
}

msg := Message{
msg: types.NewMsgPostProof("", []byte("hello"), "owner", 0, []byte("item"), []byte("list"), 0),
wg: nil,
}

wg := sync.WaitGroup{}
go func() {
t.Logf("starting goroutine(worker): %d", len(queue)-1)
for m := range queue[len(queue)-1] {
t.Logf("worker[%d]: message received", len(queue)-1)
_ = m
wg.Done()
}
}()

msgCount := 8
for range msgCount {
wg.Add(1)
to := pool.sendToAny(&msg)
r.EqualValues(len(queue)-1, to)
}

wg.Wait()
close(queue[len(queue)-1])
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Clarify test expectations and improve coverage.

The test has several concerns:

  1. Misleading test logic: The test expects all messages to go to the last worker (len(queue)-1), but the method name sendToAny suggests messages should be distributed across workers.

  2. Incomplete coverage: The test creates 4 worker channels but only listens on one, missing opportunities to verify actual distribution behavior.

  3. Test name mismatch: TestPoolSendToAny implies testing distribution to any worker, but the assertions expect all messages to go to a specific worker.

Consider these improvements:

func TestPoolSendToAny(t *testing.T) {
	r := require.New(t)
-	queue := make([]chan *Message, 0)
+	workerCount := 4
+	queue := make([]chan *Message, workerCount)
+	received := make([]int, workerCount) // Track messages per worker

-	for range 4 {
-		queue = append(queue, make(chan *Message))
+	for i := 0; i < workerCount; i++ {
+		queue[i] = make(chan *Message)
	}

	pool := Pool{
		workerChannels: queue,
	}

	// Listen on all channels to verify distribution
	wg := sync.WaitGroup{}
+	for i := 0; i < workerCount; i++ {
+		go func(workerID int) {
+			for m := range queue[workerID] {
+				received[workerID]++
+				_ = m
+				wg.Done()
+			}
+		}(i)
+	}

	msgCount := 8
	for range msgCount {
		wg.Add(1)
		to := pool.sendToAny(&msg)
+		r.True(to >= 0 && to < workerCount, "Worker index should be valid")
	}

	wg.Wait()
+	// Verify messages were distributed (adjust assertion based on actual sendToAny behavior)
+	totalReceived := 0
+	for i := 0; i < workerCount; i++ {
+		close(queue[i])
+		totalReceived += received[i]
+	}
+	r.Equal(msgCount, totalReceived, "All messages should be received")
}

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In queue/pool_test.go lines 12 to 48, the test incorrectly expects all messages
to be sent to the last worker channel, which contradicts the intended behavior
of sendToAny that should distribute messages across all workers. To fix this,
update the test to listen on all worker channels by starting a goroutine for
each channel to receive messages and track which workers get messages. Adjust
assertions to verify that messages are distributed among multiple workers rather
than all going to one. Rename the test if needed to better reflect its purpose
and ensure coverage includes all worker channels.

cl := types.NewQueryClient(wallet.Client.GRPCConn)

res, err := cl.FindFile(context.Background(), queryParams)
res, err := queryClinet.FindFile(context.Background(), queryParams)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Update variable reference to match corrected parameter name.

Update the variable reference to use the corrected parameter name.

-	res, err := queryClinet.FindFile(context.Background(), queryParams)
+	res, err := queryClient.FindFile(context.Background(), queryParams)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
res, err := queryClinet.FindFile(context.Background(), queryParams)
res, err := queryClient.FindFile(context.Background(), queryParams)
🤖 Prompt for AI Agents
In network/downloads.go at line 32, the variable name 'queryClinet' is a typo
and should be updated to 'queryClient' to match the corrected parameter name.
Replace all instances of 'queryClinet' with 'queryClient' to fix the reference.

// It queries the provider network, tries each available provider until the file is successfully downloaded and matches the expected size, and writes the file to the local file system.
// Returns an error if the file cannot be found or downloaded from any provider.
func DownloadFile(f *file_system.FileSystem, merkle []byte, owner string, start int64, wallet *wallet.Wallet, fileSize int64, myUrl string, chunkSize int64, proofType int64, ipfsParams *ipfslite.AddParams) error {
func DownloadFile(f *file_system.FileSystem, merkle []byte, owner string, start int64, wallet *wallet.Wallet, queryClinet types.QueryClient, fileSize int64, myUrl string, chunkSize int64, proofType int64, ipfsParams *ipfslite.AddParams) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix parameter name typo.

The parameter name queryClinet contains a typo and should be queryClient for consistency with the rest of the codebase.

-func DownloadFile(f *file_system.FileSystem, merkle []byte, owner string, start int64, wallet *wallet.Wallet, queryClinet types.QueryClient, fileSize int64, myUrl string, chunkSize int64, proofType int64, ipfsParams *ipfslite.AddParams) error {
+func DownloadFile(f *file_system.FileSystem, merkle []byte, owner string, start int64, wallet *wallet.Wallet, queryClient types.QueryClient, fileSize int64, myUrl string, chunkSize int64, proofType int64, ipfsParams *ipfslite.AddParams) error {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func DownloadFile(f *file_system.FileSystem, merkle []byte, owner string, start int64, wallet *wallet.Wallet, queryClinet types.QueryClient, fileSize int64, myUrl string, chunkSize int64, proofType int64, ipfsParams *ipfslite.AddParams) error {
func DownloadFile(f *file_system.FileSystem, merkle []byte, owner string, start int64, wallet *wallet.Wallet, queryClient types.QueryClient, fileSize int64, myUrl string, chunkSize int64, proofType int64, ipfsParams *ipfslite.AddParams) error {
🤖 Prompt for AI Agents
In network/downloads.go at line 27, rename the parameter `queryClinet` to
`queryClient` to correct the typo and maintain consistency with the rest of the
codebase.

Comment on lines +46 to +61
wallet, err := sequoiaWallet.CreateWallet(s.SeedPhrase, s.DerivationPath, chainCfg)
if err != nil {
t.Errorf(err.Error())
}

queryClient := mocks.SetupAuthClient(t)

baseAcc := authTypes.ProtoBaseAccount()
err = baseAcc.SetAddress(sdkTypes.AccAddress(wallet.AccAddress()))
if err != nil {
t.Errorf(err.Error())
}
any, err := codectypes.NewAnyWithValue(baseAcc)
if err != nil {
t.Errorf(err.Error())
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Tests continue after fatal setup errors

setupWalletClient logs errors with t.Errorf but then proceeds, which can produce cascading nil-pointer failures and misleading test results.
Use t.Fatalf (or require.NoError) to abort on unrecoverable setup errors.

🤖 Prompt for AI Agents
In queue/worker_test.go around lines 46 to 61, the test setup uses t.Errorf to
log errors but continues execution, risking nil-pointer dereferences and
confusing test failures. Replace all t.Errorf calls in this setup code with
t.Fatalf or use require.NoError to immediately stop the test on these critical
errors, ensuring the test does not proceed with invalid state.

Comment on lines +231 to +235
offsetWalletCount := cfg.QueueConfig.QueueThreads + int8(cfg.StrayManagerCfg.HandCount)
offsetWallets, err := sequoiaWallet.CreateOffsetWallets(a.wallet, int(offsetWalletCount))
if err != nil {
return err
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider integer overflow when calculating offsetWalletCount.

The calculation cfg.QueueConfig.QueueThreads + int8(cfg.StrayManagerCfg.HandCount) could overflow if the sum exceeds 127 (int8 max). Consider using a larger integer type for safety.

-offsetWalletCount := cfg.QueueConfig.QueueThreads + int8(cfg.StrayManagerCfg.HandCount)
-offsetWallets, err := sequoiaWallet.CreateOffsetWallets(a.wallet, int(offsetWalletCount))
+offsetWalletCount := int(cfg.QueueConfig.QueueThreads) + int(cfg.StrayManagerCfg.HandCount)
+offsetWallets, err := sequoiaWallet.CreateOffsetWallets(a.wallet, offsetWalletCount)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
offsetWalletCount := cfg.QueueConfig.QueueThreads + int8(cfg.StrayManagerCfg.HandCount)
offsetWallets, err := sequoiaWallet.CreateOffsetWallets(a.wallet, int(offsetWalletCount))
if err != nil {
return err
}
offsetWalletCount := int(cfg.QueueConfig.QueueThreads) + int(cfg.StrayManagerCfg.HandCount)
offsetWallets, err := sequoiaWallet.CreateOffsetWallets(a.wallet, offsetWalletCount)
if err != nil {
return err
}
🤖 Prompt for AI Agents
In core/app.go around lines 231 to 235, the calculation of offsetWalletCount
uses int8, which can overflow if the sum exceeds 127. Change the type of
offsetWalletCount to a larger integer type such as int or int16 to safely hold
the sum of cfg.QueueConfig.QueueThreads and cfg.StrayManagerCfg.HandCount before
passing it to CreateOffsetWallets.

queue/pool.go Outdated
}

func newOffsetWallet(main *wallet.Wallet, index int) *wallet.Wallet {
w, err := main.CloneWalletOffset(byte(index + 1))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential byte overflow in wallet offset calculation.

Casting index + 1 to byte could overflow if index >= 255, resulting in unexpected wallet offsets.

-w, err := main.CloneWalletOffset(byte(index + 1))
+if index >= 255 {
+    panic("wallet index too large: maximum 254 supported")
+}
+w, err := main.CloneWalletOffset(byte(index + 1))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
w, err := main.CloneWalletOffset(byte(index + 1))
if index >= 255 {
panic("wallet index too large: maximum 254 supported")
}
w, err := main.CloneWalletOffset(byte(index + 1))
🤖 Prompt for AI Agents
In queue/pool.go at line 178, casting the expression `index + 1` directly to
byte risks overflow when index is 255 or greater, causing incorrect wallet
offsets. To fix this, ensure the value passed to CloneWalletOffset is safely
bounded within the byte range or use a larger integer type if supported. Add
checks or constraints to prevent overflow before casting or refactor the
function to accept an integer type that can handle larger values.

queue/pool.go Outdated

workers = make([]*worker, 0, len(workerWallets))
for i, w := range workerWallets {
worker := newWorker(int8(i), w, txTimer, batchSize, maxRetryAttempt, wChannels[i], workerRunning)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential integer overflow when casting worker index.

Casting i to int8 could overflow if there are more than 127 workers, leading to negative worker IDs.

-worker := newWorker(int8(i), w, txTimer, batchSize, maxRetryAttempt, wChannels[i], workerRunning)
+if i > 127 {
+    return nil, nil, errors.New("too many workers: maximum 127 workers supported")
+}
+worker := newWorker(int8(i), w, txTimer, batchSize, maxRetryAttempt, wChannels[i], workerRunning)
🤖 Prompt for AI Agents
In queue/pool.go at line 113, the worker index variable `i` is cast to int8,
which can overflow if `i` exceeds 127, causing negative worker IDs. To fix this,
change the type cast to a larger integer type such as int or int16 that can
safely hold the maximum number of workers without overflow. Ensure the new type
is compatible with the newWorker function parameter.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
api/server.go (1)

103-116: Clarify the purpose of the goroutine + channel pattern.

The server startup was changed from a blocking call to a goroutine + channel pattern, but it still blocks immediately waiting for an error. This adds complexity without apparent benefit unless it's part of a specific graceful shutdown strategy.

Could you clarify the reasoning behind this change? If the method is intended to be non-blocking, it should return immediately. If it's meant to support graceful shutdown, the pattern should be documented.

The error handling improvement to ignore http.ErrServerClosed is good practice.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ebe4c04 and 4547d2e.

📒 Files selected for processing (3)
  • api/server.go (4 hunks)
  • file_system/file_system.go (2 hunks)
  • network/downloads.go (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • network/downloads.go
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: lint
🔇 Additional comments (5)
file_system/file_system.go (3)

554-557: Improved error handling in anonymous function.

Good improvement! The anonymous function now properly returns errors instead of ignoring them, which ensures that database access errors are properly propagated up the call stack.


588-592: Excellent resource management improvement.

The deferred close with error logging is a great addition that ensures proper resource cleanup and visibility into potential issues during file closure.


599-599: Explicit return prevents stale error returns.

The explicit return fileData, nil is a good defensive programming practice that ensures no stale errors are returned after successful operations.

api/server.go (2)

16-16: Good dependency injection pattern.

The addition of the queryClient parameter follows a clean dependency injection pattern, improving modularity and testability by allowing external clients to be passed in rather than created internally.

Also applies to: 51-51


59-60: Consistent queryClient usage.

The handlers are correctly updated to receive the injected queryClient, maintaining consistency with the dependency injection pattern.

Also applies to: 73-73

dahn510 added 3 commits June 11, 2025 20:46
fee granter is main wallet and queue worker is offset wallet
when stray manager adds new claimer, it uses main wallet as fee granter
queue worker's offset wallet can't send feegrant msg because chain uses
the granter's wallet to authorize the tx.
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
core/app.go (1)

236-238: Int8 overflow & previous feedback not addressed
The same concern raised in an earlier review remains: adding two values and storing them in an int8 risks overflow (> 127). Use int.

-offsetWalletCount := cfg.QueueConfig.QueueThreads + int8(cfg.StrayManagerCfg.HandCount)
+offsetWalletCount := int(cfg.QueueConfig.QueueThreads) + int(cfg.StrayManagerCfg.HandCount)
🧹 Nitpick comments (3)
core/app.go (3)

59-67: WithTestMode leaves a live gRPC connection open

NewApp establishes a real GRPCConn and immediately constructs a QueryClient from it; WithTestMode later swaps the client for a fake but never closes the underlying connection.
Leaking this connection in unit/integration tests slows them down and may hit file-descriptor limits.

 func WithTestMode() Option {
 	return func(app *App) {
+	// Close the real connection opened in NewApp.
+	if app.wallet.Client.GRPCConn != nil {
+		_ = app.wallet.Client.GRPCConn.Close()
+	}
 		app.queryClient = testutil.NewFakeStorageQueryClient()
 		app.wallet.Client.AuthClient = testutil.NewFakeAuthQueryClient(app.wallet)
 		app.wallet.Client.RPCClient = testutil.NewFakeRPCClient()
 		app.wallet.Client.TxClient = testutil.NewFakeServiceClient()
 	}
 }

129-138: Avoid constructing a real QueryClient when it may be overwritten

queryClient := storageTypes.NewQueryClient(w.Client.GRPCConn) is created before any Options are applied, then potentially discarded by WithTestMode.
Delay construction until after the options loop to prevent unnecessary network dials and simplify test stubbing.


212-221: Redundant parameter in GetStorageParams

GetStorageParams belongs to App yet requires an explicit storageTypes.QueryClient argument even though a.queryClient exists. This adds noise to every call.

-func (a *App) GetStorageParams(queryClient storageTypes.QueryClient) (storageTypes.Params, error) {
-    res, err := queryClient.Params(context.Background(), &storageTypes.QueryParams{})
+func (a *App) GetStorageParams() (storageTypes.Params, error) {
+    res, err := a.queryClient.Params(context.Background(), &storageTypes.QueryParams{})
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a1aa8ac and 52c60ed.

📒 Files selected for processing (3)
  • core/app.go (6 hunks)
  • queue/pool.go (1 hunks)
  • strays/manager.go (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • strays/manager.go
  • queue/pool.go
🧰 Additional context used
🧬 Code Graph Analysis (1)
core/app.go (9)
queue/queue.go (1)
  • Queue (9-13)
proofs/types.go (2)
  • Prover (15-26)
  • FileSystem (28-32)
monitoring/types.go (2)
  • Monitor (31-34)
  • NewMonitor (36-41)
file_system/types.go (1)
  • FileSystem (17-22)
testutil/fake.go (4)
  • NewFakeStorageQueryClient (31-33)
  • NewFakeAuthQueryClient (225-229)
  • NewFakeRPCClient (315-319)
  • NewFakeServiceClient (272-274)
wallet/wallet.go (1)
  • CreateOffsetWallets (15-25)
queue/pool.go (1)
  • NewPool (24-44)
proofs/proofs.go (1)
  • NewProver (390-405)
strays/manager.go (1)
  • NewStrayManager (18-32)
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: lint
🔇 Additional comments (1)
core/app.go (1)

14-15: Confirm build toolchain ≥ Go 1.21 for slices usage

golang.org/x/exp/slices moved into the std-lib in Go 1.21 (slices). If your CI or downstream consumers are still pinned to ≤ 1.20, compilation will fail.
Verify that the go directive in go.mod, CI images and Dockerfiles are updated accordingly.

Comment on lines +351 to +370
func (a *App) authorizeClaimer(claimer *wallet.Wallet) error {
allowance := feegrant.BasicAllowance{
SpendLimit: nil,
Expiration: nil,
}

wadd, err := types.AccAddressFromBech32(a.wallet.AccAddress())
if err != nil {
return err
}

hadd, err := types.AccAddressFromBech32(claimer.AccAddress())
if err != nil {
return err
}

grantMsg, nerr := feegrant.NewMsgGrantAllowance(&allowance, wadd, hadd)
if nerr != nil {
return err
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Incorrect error variable returned in authorizeClaimer

grantMsg, nerr := feegrant.NewMsgGrantAllowance...
The subsequent check returns err, which is nil here, swallowing the real error.

-grantMsg, nerr := feegrant.NewMsgGrantAllowance(&allowance, wadd, hadd)
-if nerr != nil {
-    return err
-}
+grantMsg, nerr := feegrant.NewMsgGrantAllowance(&allowance, wadd, hadd)
+if nerr != nil {
+    return nerr
+}

Also consider embedding this pattern in a lint rule to prevent similar shadowing mistakes.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (a *App) authorizeClaimer(claimer *wallet.Wallet) error {
allowance := feegrant.BasicAllowance{
SpendLimit: nil,
Expiration: nil,
}
wadd, err := types.AccAddressFromBech32(a.wallet.AccAddress())
if err != nil {
return err
}
hadd, err := types.AccAddressFromBech32(claimer.AccAddress())
if err != nil {
return err
}
grantMsg, nerr := feegrant.NewMsgGrantAllowance(&allowance, wadd, hadd)
if nerr != nil {
return err
}
func (a *App) authorizeClaimer(claimer *wallet.Wallet) error {
allowance := feegrant.BasicAllowance{
SpendLimit: nil,
Expiration: nil,
}
wadd, err := types.AccAddressFromBech32(a.wallet.AccAddress())
if err != nil {
return err
}
hadd, err := types.AccAddressFromBech32(claimer.AccAddress())
if err != nil {
return err
}
grantMsg, nerr := feegrant.NewMsgGrantAllowance(&allowance, wadd, hadd)
if nerr != nil {
return nerr
}
// …rest of function…
}
🤖 Prompt for AI Agents
In core/app.go lines 351 to 370, the error check after calling
feegrant.NewMsgGrantAllowance incorrectly returns the variable 'err' instead of
'nerr', which causes the real error to be ignored. Fix this by returning 'nerr'
in the error check to properly propagate the error from NewMsgGrantAllowance.

Comment on lines +284 to +295
params, err := a.GetStorageParams(a.queryClient)
if err != nil {
return err
}

a.q = queue.NewQueue(a.wallet, cfg.QueueInterval)
queueWallets := offsetWallets[:cfg.QueueConfig.QueueThreads]
a.q, err = queue.NewPool(a.wallet, a.queryClient, queueWallets, cfg.QueueConfig)
if err != nil {
log.Error().Err(err).Msg("failed to initialize Queue module")
return err
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Slice bound uses int8; code will not compile

queueWallets := offsetWallets[:cfg.QueueConfig.QueueThreads] fails when QueueThreads is int8:

invalid slice index cfg.QueueConfig.QueueThreads (type int8)

Cast to int once and reuse:

-threadCount := cfg.QueueConfig.QueueThreads
-queueWallets := offsetWallets[:threadCount]
+threadCount := int(cfg.QueueConfig.QueueThreads)
+queueWallets := offsetWallets[:threadCount]

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In core/app.go around lines 284 to 295, the slice operation uses
cfg.QueueConfig.QueueThreads which is of type int8, causing a compile error
because slice indices must be int. Fix this by casting
cfg.QueueConfig.QueueThreads to int before using it as a slice index, and reuse
the casted int value to avoid repeated conversions.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🔭 Outside diff range comments (1)
core/app.go (1)

242-276: res may be nil – panic-prone control flow

If the first call to Provider fails, the code runs initProviderOnChain but continues to use res.Provider.*, even though res is still nil. This leads to a nil-pointer panic and makes the provider-initialisation path unusable.

-res, err := a.queryClient.Provider(context.Background(), queryParams)
-if err != nil {
-    log.Info().Err(err).Msg("Provider does not exist on network or is not connected...")
-    err := initProviderOnChain(a.wallet, cfg.Ip, cfg.TotalSpace)
-    if err != nil {
-        return err
-    }
+res, err := a.queryClient.Provider(context.Background(), queryParams)
+if err != nil {
+    log.Info().Err(err).Msg("Provider does not exist on network or is not connected...")
+    if ierr := initProviderOnChain(a.wallet, cfg.Ip, cfg.TotalSpace); ierr != nil {
+        return ierr
+    }
+    // re-query to populate `res`
+    res, err = a.queryClient.Provider(context.Background(), queryParams)
+    if err != nil {
+        return err
+    }
 }
♻️ Duplicate comments (2)
core/app.go (2)

236-238: int8 arithmetic & overflow ‒ issue remains

Using int8 for offsetWalletCount can still overflow and forces an unnecessary cast right afterwards. The same concern was raised earlier but is still unresolved.

-offsetWalletCount := cfg.QueueConfig.QueueThreads + int8(cfg.StrayManagerCfg.HandCount)
-offsetWallets, err := sequoiaWallet.CreateOffsetWallets(a.wallet, int(offsetWalletCount))
+offsetWalletCount := int(cfg.QueueConfig.QueueThreads) + int(cfg.StrayManagerCfg.HandCount)
+offsetWallets, err := sequoiaWallet.CreateOffsetWallets(a.wallet, offsetWalletCount)

367-370: Shadowed error variable ‒ real error lost

Still returning the wrong variable; err is unrelated here. Should return nerr.

-grantMsg, nerr := feegrant.NewMsgGrantAllowance(&allowance, wadd, hadd)
-if nerr != nil {
-    return err
-}
+grantMsg, nerr := feegrant.NewMsgGrantAllowance(&allowance, wadd, hadd)
+if nerr != nil {
+    return nerr
+}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 52c60ed and a70c565.

📒 Files selected for processing (1)
  • core/app.go (7 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: lint
  • GitHub Check: cov

Comment on lines +379 to +380
log.Info().Str("claimer", claimer.AccAddress()).Type("msg_type", msg).Str("tx_hash", res.TxHash).Msg("added new claimer")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

zerolog builder has no Type method – compilation failure

github.com/rs/zerolog/log.Event provides Str, Int, Interface, etc., but not Type. Replace with Str (or another supported call).

-log.Info().Str("claimer", claimer.AccAddress()).Type("msg_type", msg).Str("tx_hash", res.TxHash).Msg("added new claimer")
+log.Info().
+    Str("claimer", claimer.AccAddress()).
+    Str("msg_type", msg.Type()).
+    Str("tx_hash", res.TxHash).
+    Msg("added new claimer")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
log.Info().Str("claimer", claimer.AccAddress()).Type("msg_type", msg).Str("tx_hash", res.TxHash).Msg("added new claimer")
// before (core/app.go:379-380)
- log.Info().Str("claimer", claimer.AccAddress()).Type("msg_type", msg).Str("tx_hash", res.TxHash).Msg("added new claimer")
+ log.Info().
+ Str("claimer", claimer.AccAddress()).
+ Str("msg_type", msg.Type()).
+ Str("tx_hash", res.TxHash).
+ Msg("added new claimer")
🤖 Prompt for AI Agents
In core/app.go at lines 379-380, the zerolog log event incorrectly uses a
non-existent Type method, causing a compilation failure. Replace the
Type("msg_type", msg) call with Str("msg_type", msg) or another appropriate
supported method to log the message type as a string.

Comment on lines +289 to +306
queueWallets := offsetWallets[:cfg.QueueConfig.QueueThreads]
a.q, err = queue.NewPool(a.wallet, a.queryClient, queueWallets, cfg.QueueConfig)
if err != nil {
log.Error().Err(err).Msg("failed to initialize Queue module")
return err
}

a.q = queue.NewQueue(a.wallet, cfg.QueueInterval)
go a.q.Listen()

prover := proofs.NewProver(a.wallet, a.q, a.fileSystem, cfg.ProofInterval, cfg.ProofThreads, int(params.ChunkSize))
prover := proofs.NewProver(a.wallet, a.queryClient, a.q, a.fileSystem, cfg.ProofInterval, cfg.ProofThreads, int(params.ChunkSize))

myUrl := cfg.Ip

log.Info().Msg(fmt.Sprintf("Provider started as: %s", myAddress))

a.prover = prover
a.strayManager = strays.NewStrayManager(a.wallet, a.q, cfg.StrayManagerCfg.CheckInterval, cfg.StrayManagerCfg.RefreshInterval, cfg.StrayManagerCfg.HandCount, claimers)
handWallets := offsetWallets[cfg.QueueConfig.QueueThreads:]
strayManager, err := strays.NewStrayManager(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Slice indices must be int – code will not compile

cfg.QueueConfig.QueueThreads is int8; using it directly in slice expressions causes:

invalid slice index cfg.QueueConfig.QueueThreads (type int8)

Fix by casting once and re-using the int value:

-threadCount := cfg.QueueConfig.QueueThreads
-queueWallets := offsetWallets[:threadCount]
-handWallets := offsetWallets[threadCount:]
+threadCount := int(cfg.QueueConfig.QueueThreads)
+queueWallets := offsetWallets[:threadCount]
+handWallets := offsetWallets[threadCount:]

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In core/app.go around lines 289 to 306, the slice indices use
cfg.QueueConfig.QueueThreads which is an int8, causing a compilation error
because slice indices must be int. Fix this by casting
cfg.QueueConfig.QueueThreads to int once, storing it in a variable, and then use
that int variable for all slice operations to ensure the code compiles
correctly.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (4)
core/app.go (4)

238-239: Integer overflow risk in offsetWalletCount calculation remains unaddressed.

This is the same issue identified in previous reviews where int8 could overflow if the sum exceeds 127.


299-300: Slice index compilation error with int8 type remains unaddressed.

The slice operations still use cfg.QueueConfig.QueueThreads which is int8, causing compilation failures as slice indices must be int.

Also applies to: 315-315


377-380: Critical error variable bug in authorizeClaimer remains unaddressed.

The error check incorrectly returns err instead of nerr, causing the real error from NewMsgGrantAllowance to be ignored.


389-389: Zerolog compilation error with non-existent Type method remains unaddressed.

The zerolog Event type doesn't have a Type method, causing compilation failure.

🧹 Nitpick comments (2)
core/app.go (2)

285-292: Consider potential partial failure in claimer authorization loop.

While the authorization logic is correct, if the loop fails partway through, some wallets will be authorized while others won't be. Consider whether this partial state is acceptable or if rollback logic is needed.


335-335: Consider refactoring the lengthy parameter list.

The api.Serve call has many parameters which makes it harder to maintain and understand. Consider using a configuration struct or builder pattern.

-go a.api.Serve(a.fileSystem, a.prover, a.wallet, a.queryClient, myUrl, params.ChunkSize)
+serverConfig := &api.ServerConfig{
+    FileSystem:  a.fileSystem,
+    Prover:      a.prover,
+    Wallet:      a.wallet,
+    QueryClient: a.queryClient,
+    URL:         myUrl,
+    ChunkSize:   params.ChunkSize,
+}
+go a.api.Serve(serverConfig)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a70c565 and c9a1a0b.

📒 Files selected for processing (1)
  • core/app.go (5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: Test
  • GitHub Check: cov
  • GitHub Check: lint
  • GitHub Check: Build
🔇 Additional comments (2)
core/app.go (2)

61-70: Well-implemented functional options pattern for dependency injection.

The WithTestMode option provides clean dependency injection for testing by replacing the real clients with fake implementations. This is a solid approach for making the code testable.


244-259: Proper error handling for provider registration.

The logic correctly handles the NotFound gRPC status code to register a new provider on-chain when it doesn't exist. The error handling and logging are appropriate for this critical initialization step.

dahn510 added 2 commits June 23, 2025 15:48
route postproof message to auth claimers and the rest to main wallet
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
queue/pool.go (1)

109-109: Potential integer overflow when casting worker index.

Casting i to int8 could overflow if there are more than 127 workers, leading to negative worker IDs.

+if i > 127 {
+    panic("too many workers: maximum 127 workers supported")
+}
 worker := newWorker(int8(i), w, txTimer, batchSize, maxRetryAttempt, wChannels[i])
🧹 Nitpick comments (1)
queue/pool.go (1)

40-46: Consider adding synchronization for graceful shutdown.

The Stop method closes channels but doesn't wait for workers to finish processing. This could lead to goroutine leaks or sends on closed channels if Add is called after Stop.

Consider tracking worker state and implementing graceful shutdown:

type Pool struct {
    // ... existing fields ...
    stopped bool
    mu      sync.RWMutex
    wg      sync.WaitGroup
}

func (p *Pool) Stop() {
    p.mu.Lock()
    defer p.mu.Unlock()
    if p.stopped {
        return
    }
    p.stopped = true
    close(p.rootQueue)
    for _, c := range p.offsetQueue {
        close(c)
    }
    p.wg.Wait() // Wait for all workers to finish
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7d35001 and fbd533d.

📒 Files selected for processing (2)
  • queue/pool.go (1 hunks)
  • queue/worker.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • queue/worker.go
🧰 Additional context used
🧬 Code Graph Analysis (1)
queue/pool.go (3)
queue/queue.go (1)
  • Queue (9-13)
queue/types.go (1)
  • Message (12-18)
config/types.go (1)
  • QueueConfig (143-153)
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: cov
  • GitHub Check: lint
  • GitHub Check: Test
  • GitHub Check: Build
🔇 Additional comments (1)
queue/pool.go (1)

62-62: Clarify the purpose of msgIndex field.

The comment indicates uncertainty about whether msgIndex is still relevant. Please clarify if this field is still needed or remove it from the Message struct if it's deprecated.

Comment on lines +37 to +38
return pool, nil
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove error return or add error handling.

The function always returns nil for the error. Either remove the error return from the signature or add proper error handling for potential failures (e.g., validate inputs, handle errors from worker creation).

-func NewPool(main *wallet.Wallet, queryClient storageTypes.QueryClient, workerWallets []*wallet.Wallet, config config.QueueConfig) (*Pool, error) {
+func NewPool(main *wallet.Wallet, queryClient storageTypes.QueryClient, workerWallets []*wallet.Wallet, config config.QueueConfig) *Pool {
    // ... existing code ...
-   return pool, nil
+   return pool
}
🤖 Prompt for AI Agents
In queue/pool.go around lines 37 to 38, the function returns an error value that
is always nil, which is unnecessary. Either remove the error return type from
the function signature if no errors can occur, or add proper error handling by
validating inputs and handling any errors from worker creation before returning.
Adjust the return statements accordingly to match the updated signature or error
handling logic.

Comment on lines +26 to +38
func NewPool(main *wallet.Wallet, queryClient storageTypes.QueryClient, workerWallets []*wallet.Wallet, config config.QueueConfig) (*Pool, error) {
root, rootQueue := createWorkers([]*wallet.Wallet{main}, int(config.TxTimer), int(config.TxBatchSize), config.MaxRetryAttempt)
workers, workerChannels := createWorkers(workerWallets, int(config.TxTimer), int(config.TxBatchSize), config.MaxRetryAttempt)

pool := &Pool{
root: root[0],
rootQueue: rootQueue[0],
offsets: workers,
offsetQueue: workerChannels,
}

return pool, nil
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove unused queryClient parameter or implement its usage.

The queryClient parameter is passed to the constructor but never used. Either remove it if it's not needed, or implement its intended usage within the Pool or pass it to the workers.

If the parameter is not needed:

-func NewPool(main *wallet.Wallet, queryClient storageTypes.QueryClient, workerWallets []*wallet.Wallet, config config.QueueConfig) (*Pool, error) {
+func NewPool(main *wallet.Wallet, workerWallets []*wallet.Wallet, config config.QueueConfig) (*Pool, error) {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func NewPool(main *wallet.Wallet, queryClient storageTypes.QueryClient, workerWallets []*wallet.Wallet, config config.QueueConfig) (*Pool, error) {
root, rootQueue := createWorkers([]*wallet.Wallet{main}, int(config.TxTimer), int(config.TxBatchSize), config.MaxRetryAttempt)
workers, workerChannels := createWorkers(workerWallets, int(config.TxTimer), int(config.TxBatchSize), config.MaxRetryAttempt)
pool := &Pool{
root: root[0],
rootQueue: rootQueue[0],
offsets: workers,
offsetQueue: workerChannels,
}
return pool, nil
}
func NewPool(main *wallet.Wallet, workerWallets []*wallet.Wallet, config config.QueueConfig) (*Pool, error) {
root, rootQueue := createWorkers([]*wallet.Wallet{main}, int(config.TxTimer), int(config.TxBatchSize), config.MaxRetryAttempt)
workers, workerChannels := createWorkers(workerWallets, int(config.TxTimer), int(config.TxBatchSize), config.MaxRetryAttempt)
pool := &Pool{
root: root[0],
rootQueue: rootQueue[0],
offsets: workers,
offsetQueue: workerChannels,
}
return pool, nil
}
🤖 Prompt for AI Agents
In queue/pool.go between lines 26 and 38, the queryClient parameter is declared
in the NewPool function signature but never used inside the function. To fix
this, either remove the queryClient parameter from the NewPool function
signature if it is not required, or if it is intended to be used, integrate it
properly by passing it to the Pool struct or to the workers during their
creation. Ensure the function signature and implementation are consistent.

Comment on lines +71 to +83
func (p *Pool) sendToQueue(msg *Message) (workerId string) {
// Auth claimers can sign and broadcast MsgPostProof but
// other messages must be signed by main wallet
switch msg.msg.(type) {
case *storageTypes.MsgPostProof:
to := p.sendToOffsets(msg)
return p.offsets[to].Id()

default:
p.rootQueue <- msg
return p.root.Id() // 0 is always root
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add validation for offset workers availability.

The code assumes offset workers exist when handling MsgPostProof. If p.offsetQueue is empty, the sendToOffsets method would panic.

 func (p *Pool) sendToQueue(msg *Message) (workerId string) {
     // Auth claimers can sign and broadcast MsgPostProof but
     // other messages must be signed by main wallet
     switch msg.msg.(type) {
     case *storageTypes.MsgPostProof:
+        if len(p.offsetQueue) == 0 {
+            log.Error().Msg("No offset workers available for MsgPostProof")
+            p.rootQueue <- msg
+            return p.root.Id()
+        }
         to := p.sendToOffsets(msg)
         return p.offsets[to].Id()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (p *Pool) sendToQueue(msg *Message) (workerId string) {
// Auth claimers can sign and broadcast MsgPostProof but
// other messages must be signed by main wallet
switch msg.msg.(type) {
case *storageTypes.MsgPostProof:
to := p.sendToOffsets(msg)
return p.offsets[to].Id()
default:
p.rootQueue <- msg
return p.root.Id() // 0 is always root
}
}
func (p *Pool) sendToQueue(msg *Message) (workerId string) {
// Auth claimers can sign and broadcast MsgPostProof but
// other messages must be signed by main wallet
switch msg.msg.(type) {
case *storageTypes.MsgPostProof:
if len(p.offsetQueue) == 0 {
log.Error().Msg("No offset workers available for MsgPostProof")
p.rootQueue <- msg
return p.root.Id()
}
to := p.sendToOffsets(msg)
return p.offsets[to].Id()
default:
p.rootQueue <- msg
return p.root.Id() // 0 is always root
}
}
🤖 Prompt for AI Agents
In queue/pool.go around lines 71 to 83, the sendToQueue function assumes offset
workers exist when handling MsgPostProof, which can cause a panic if
p.offsetQueue is empty. Add a validation check before calling sendToOffsets to
ensure offset workers are available; if not, fallback to sending the message to
the rootQueue and return the root worker ID. This prevents panics by handling
the empty offset queue case gracefully.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (4)
core/app.go (4)

238-242: Integer overflow risk in offsetWalletCount calculation

The calculation could overflow if the sum exceeds 127 (int8 max value), which would cause runtime issues.


296-296: Slice index compilation error with int8 type

Using cfg.QueueConfig.QueueThreads (int8) directly as a slice index will cause a compilation error since slice indices must be int.


374-377: Incorrect error variable returned in authorizeClaimer

The error check returns err instead of nerr, which ignores the actual error from feegrant.NewMsgGrantAllowance.


386-387: Zerolog Type method doesn't exist - compilation failure

The zerolog Event struct doesn't have a Type method, which will cause compilation to fail.

🧹 Nitpick comments (1)
core/app.go (1)

358-389: Consider adding transaction retry logic

The authorizeClaimer method handles the core functionality correctly, but consider adding retry logic for failed transactions to improve reliability in network-unstable conditions.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fbd533d and 28579bc.

📒 Files selected for processing (4)
  • core/app.go (5 hunks)
  • queue/pool_bench_test.go (1 hunks)
  • queue/pool_test.go (1 hunks)
  • queue/worker_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • queue/pool_test.go
  • queue/pool_bench_test.go
  • queue/worker_test.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Build
  • GitHub Check: cov
  • GitHub Check: Test
  • GitHub Check: lint
🔇 Additional comments (6)
core/app.go (6)

51-51: Good refactoring to use queue interface

The change from *queue.Queue to queue.Queue improves abstraction and enables better testability with the new Pool implementation.


58-58: Excellent addition of centralized query client

Adding the queryClient field to the App struct centralizes query client usage and improves dependency injection patterns throughout the application.


61-70: Well-implemented options pattern for dependency injection

The Option type and WithTestMode function provide a clean way to inject test dependencies, improving testability.


244-256: Robust provider registration logic

The provider existence checking and automatic registration logic handles the case where providers don't exist on-chain gracefully.


282-289: Efficient claimer authorization with batch processing

The logic to check and authorize missing claimers is well-implemented, using slices.Contains for efficient membership testing.


297-297: Proper queue pool initialization

The transition to using queue.NewPool with distributed wallets aligns well with the multithreading objectives of this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants