diff --git a/Makefile b/Makefile index aeb168818..a82b52877 100644 --- a/Makefile +++ b/Makefile @@ -18,10 +18,10 @@ TARGET_OS?=$(shell uname) export TARGET_OS ROLLUPS_NODE_VERSION := 2.0.0-alpha.9 -ROLLUPS_CONTRACTS_VERSION := 2.1.1 +ROLLUPS_CONTRACTS_VERSION := 2.2.0 ROLLUPS_CONTRACTS_URL:=https://github.com/cartesi/rollups-contracts/releases/download/ ROLLUPS_CONTRACTS_ARTIFACT:=rollups-contracts-$(ROLLUPS_CONTRACTS_VERSION)-artifacts.tar.gz -ROLLUPS_CONTRACTS_SHA256:=2e7a105d656de2adafad6439a5ff00f35b997aaf27972bd1becc33dea8817861 +ROLLUPS_CONTRACTS_SHA256:=31c20a8c50f794185957ebd6e554fc99c8e01f0fdf9a80628d031fb0edc7091d ROLLUPS_PRT_CONTRACTS_VERSION := 2.1.0 ROLLUPS_PRT_CONTRACTS_URL:=https://github.com/cartesi/dave/releases/download/ ROLLUPS_PRT_CONTRACTS_ARTIFACT:=cartesi-rollups-prt-$(ROLLUPS_PRT_CONTRACTS_VERSION)-contract-artifacts.tar.gz diff --git a/internal/claimer/blockchain.go b/internal/claimer/blockchain.go index e148460dd..25255b712 100644 --- a/internal/claimer/blockchain.go +++ b/internal/claimer/blockchain.go @@ -6,7 +6,6 @@ package claimer import ( "context" "fmt" - "iter" "log/slog" "math/big" @@ -15,8 +14,6 @@ import ( "github.com/cartesi/rollups-node/pkg/contracts/iconsensus" "github.com/cartesi/rollups-node/pkg/ethutil" - "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -61,7 +58,7 @@ type iclaimerBlockchain interface { error, ) - getBlockNumber(ctx context.Context) (*big.Int, error) + getCommitmentBlockNumber(ctx context.Context) (*big.Int, error) getConsensusAddress( ctx context.Context, @@ -77,51 +74,87 @@ type claimerBlockchain struct { defaultBlock config.DefaultBlock } -func (self *claimerBlockchain) submitClaimToBlockchain( +func (cb *claimerBlockchain) submitClaimToBlockchain( ic *iconsensus.IConsensus, application *model.Application, epoch *model.Epoch, ) (common.Hash, error) { - txHash := common.Hash{} + maybeTxHash := common.Hash{} + if cb.txOpts == nil { + return maybeTxHash, fmt.Errorf("txOpts is required for claim submission") + } lastBlockNumber := new(big.Int).SetUint64(epoch.LastBlock) - tx, err := ic.SubmitClaim(self.txOpts, application.IApplicationAddress, + tx, err := ic.SubmitClaim(cb.txOpts, application.IApplicationAddress, lastBlockNumber, *epoch.OutputsMerkleRoot) if err != nil { - self.logger.Error("submitClaimToBlockchain:failed", + cb.logger.Error("submitClaimToBlockchain:failed", "appContractAddress", application.IApplicationAddress, "claimHash", *epoch.OutputsMerkleRoot, "last_block", epoch.LastBlock, "error", err) } else { - txHash = tx.Hash() - self.logger.Debug("submitClaimToBlockchain:success", + maybeTxHash = tx.Hash() + cb.logger.Debug("submitClaimToBlockchain:success", "appContractAddress", application.IApplicationAddress, "claimHash", *epoch.OutputsMerkleRoot, "last_block", epoch.LastBlock, - "TxHash", txHash) + "TxHash", maybeTxHash) } - return txHash, err + return maybeTxHash, err } -func unwrapClaimSubmitted( - ic *iconsensus.IConsensus, - pull func() (log *types.Log, err error, ok bool), +type EventIterator interface { + Next() bool + Close() error +} + +func newOracle( + nr func(*bind.CallOpts) (*big.Int, error), ) ( - *iconsensus.IConsensusClaimSubmitted, - bool, - error, + func(ctx context.Context, block uint64) (*big.Int, error), ) { - log, err, ok := pull() - if !ok || err != nil { - return nil, false, err + return func(ctx context.Context, block uint64) (*big.Int, error) { + callOpts := &bind.CallOpts{ + Context: ctx, + BlockNumber: new(big.Int).SetUint64(block), + } + numEvents, err := nr(callOpts) + + if err != nil { + return nil, fmt.Errorf("failed to get event count in block %d: %w", block, err) + } + return numEvents, nil + } +} + +func newOnHit[IT EventIterator]( + ctx context.Context, + address common.Address, + filter func (*bind.FilterOpts, []common.Address, []common.Address) (IT, error), + onEvent func(IT), +) ( + func(block uint64) error, +) { + return func(block uint64) error { + filterOpts := &bind.FilterOpts{ + Context: ctx, + Start: block, + End: &block, + } + it, err := filter(filterOpts, nil, []common.Address{address}) + if err != nil { + return fmt.Errorf("failed to retrieve events at block %d: %w", block, err) + } + for it.Next() { + onEvent(it) + } + return it.Close() } - ev, err := ic.ParseClaimSubmitted(*log) - return ev, true, err } // scan the event stream for a claimSubmitted event that matches claim. // return this event and its successor -func (self *claimerBlockchain) findClaimSubmittedEventAndSucc( +func (cb *claimerBlockchain) findClaimSubmittedEventAndSucc( ctx context.Context, application *model.Application, epoch *model.Epoch, @@ -132,78 +165,44 @@ func (self *claimerBlockchain) findClaimSubmittedEventAndSucc( *iconsensus.IConsensusClaimSubmitted, error, ) { - ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, self.client) + ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, cb.client) if err != nil { return nil, nil, nil, err } - - // filter must match: - // - `ClaimSubmitted` events - // - submitter == nil (any) - // - appContract == claim.IApplicationAddress - c, err := iconsensus.IConsensusMetaData.GetAbi() - topics, err := abi.MakeTopics( - []any{c.Events[model.MonitoredEvent_ClaimSubmitted.String()].ID}, - nil, - []any{application.IApplicationAddress}, + oracle := newOracle(ic.GetNumberOfSubmittedClaims) + events := []*iconsensus.IConsensusClaimSubmitted{} + onHit := newOnHit(ctx, application.IApplicationAddress, ic.FilterClaimSubmitted, + func(it *iconsensus.IConsensusClaimSubmittedIterator) { + event := it.Event + if (len(events) == 0) && claimSubmittedEventMatches(application, epoch, event) { + events = append(events, event) + } else if len(events) != 0 { + events = append(events, event) + } + }, ) - if err != nil { - return nil, nil, nil, err - } - it, err := self.filter.ChunkedFilterLogs(ctx, self.client, ethereum.FilterQuery{ - FromBlock: new(big.Int).SetUint64(epoch.LastBlock), - ToBlock: endBlock, - Addresses: []common.Address{application.IConsensusAddress}, - Topics: topics, - }) + numSubmittedClaims, err := oracle(ctx, epoch.LastBlock) if err != nil { return nil, nil, nil, err } - - // pull events instead of iterating - next, stop := iter.Pull2(it) - defer stop() - for { - event, ok, err := unwrapClaimSubmitted(ic, next) - if !ok || err != nil { - return ic, event, nil, err - } - lastBlock := event.LastProcessedBlockNumber.Uint64() - - if claimSubmittedEventMatches(application, epoch, event) { - // found the event, does it has a successor? try to fetch it - succ, ok, err := unwrapClaimSubmitted(ic, next) - if !ok || err != nil { - return ic, event, nil, err - } - return ic, event, succ, err - } else if lastBlock > epoch.LastBlock { - err = fmt.Errorf("No matching claim, searched up to %v", event) - return nil, nil, nil, err - } + _, err = ethutil.FindTransitions(ctx, epoch.LastBlock, endBlock.Uint64(), numSubmittedClaims, oracle, onHit) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to walk ClaimSubmitted transitions: %w", err) } -} -func unwrapClaimAccepted( - ic *iconsensus.IConsensus, - pull func() (log *types.Log, err error, ok bool), -) ( - *iconsensus.IConsensusClaimAccepted, - bool, - error, -) { - log, err, ok := pull() - if !ok || err != nil { - return nil, false, err + if len(events) == 0 { + return ic, nil, nil, nil + } else if len(events) == 1 { + return ic, events[0], nil, nil + } else { + return ic, events[0], events[1], nil } - ev, err := ic.ParseClaimAccepted(*log) - return ev, true, err } // scan the event stream for a claimAccepted event that matches claim. // return this event and its successor -func (self *claimerBlockchain) findClaimAcceptedEventAndSucc( +func (cb *claimerBlockchain) findClaimAcceptedEventAndSucc( ctx context.Context, application *model.Application, epoch *model.Epoch, @@ -214,91 +213,79 @@ func (self *claimerBlockchain) findClaimAcceptedEventAndSucc( *iconsensus.IConsensusClaimAccepted, error, ) { - ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, self.client) + ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, cb.client) if err != nil { return nil, nil, nil, err } - // filter must match: - // - `ClaimAccepted` events - // - appContract == claim.IApplicationAddress - c, err := iconsensus.IConsensusMetaData.GetAbi() - topics, err := abi.MakeTopics( - []any{c.Events[model.MonitoredEvent_ClaimAccepted.String()].ID}, - []any{application.IApplicationAddress}, + oracle := newOracle(ic.GetNumberOfAcceptedClaims) + events := []*iconsensus.IConsensusClaimAccepted{} + filter := func( + opts *bind.FilterOpts, + _ []common.Address, + appContract []common.Address, + ) (*iconsensus.IConsensusClaimAcceptedIterator, error) { + return ic.FilterClaimAccepted(opts, appContract) + } + onHit := newOnHit(ctx, application.IApplicationAddress, filter, + func(it *iconsensus.IConsensusClaimAcceptedIterator) { + event := it.Event + if (len(events) == 0) && claimAcceptedEventMatches(application, epoch, event) { + events = append(events, event) + } else if len(events) != 0 { + events = append(events, event) + } + }, ) + + numAcceptedClaims, err := oracle(ctx, epoch.LastBlock) if err != nil { return nil, nil, nil, err } - - it, err := self.filter.ChunkedFilterLogs(ctx, self.client, ethereum.FilterQuery{ - FromBlock: new(big.Int).SetUint64(epoch.LastBlock), - ToBlock: endBlock, - Addresses: []common.Address{application.IConsensusAddress}, - Topics: topics, - }) + _, err = ethutil.FindTransitions(ctx, epoch.LastBlock, endBlock.Uint64(), numAcceptedClaims, oracle, onHit) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, fmt.Errorf("failed to walk ClaimAccepted transitions: %w", err) } - // pull events instead of iterating - next, stop := iter.Pull2(it) - defer stop() - for { - event, ok, err := unwrapClaimAccepted(ic, next) - if !ok || err != nil { - return ic, event, nil, err - } - lastBlock := event.LastProcessedBlockNumber.Uint64() - - if claimAcceptedEventMatches(application, epoch, event) { - // found the event, does it has a successor? try to fetch it - succ, ok, err := unwrapClaimAccepted(ic, next) - if !ok || err != nil { - return ic, event, nil, err - } - return ic, event, succ, err - } else if lastBlock > epoch.LastBlock { - err = fmt.Errorf("No matching claim, searched up to %v", event) - return nil, nil, nil, err - } + if len(events) == 0 { + return ic, nil, nil, nil + } else if len(events) == 1 { + return ic, events[0], nil, nil + } else { + return ic, events[0], events[1], nil } } -func (self *claimerBlockchain) getConsensusAddress( +func (cb *claimerBlockchain) getConsensusAddress( ctx context.Context, app *model.Application, ) (common.Address, error) { - return ethutil.GetConsensus(ctx, self.client, app.IApplicationAddress) + return ethutil.GetConsensus(ctx, cb.client, app.IApplicationAddress) } -/* poll a transaction hash for its submission status and receipt */ -func (self *claimerBlockchain) pollTransaction( +// poll a transaction for its receipt +func (cb *claimerBlockchain) pollTransaction( ctx context.Context, txHash common.Hash, - endBlock *big.Int, + commitmentBlockNumber *big.Int, ) (bool, *types.Receipt, error) { - _, isPending, err := self.client.TransactionByHash(ctx, txHash) - if err != nil || isPending { - return false, nil, err - } - - receipt, err := self.client.TransactionReceipt(ctx, txHash) + receipt, err := cb.client.TransactionReceipt(ctx, txHash) if err != nil { return false, nil, err } - if receipt.BlockNumber.Cmp(endBlock) >= 0 { - return false, receipt, err + // receipt must be committed before use. Return false until it is. + if receipt.BlockNumber.Cmp(commitmentBlockNumber) >= 0 { + return false, nil, nil } - return receipt.Status == 1, receipt, err + return receipt.Status == 1, receipt, nil } -/* Retrieve the block number of "DefaultBlock" */ -func (self *claimerBlockchain) getBlockNumber(ctx context.Context) (*big.Int, error) { +/* Retrieve the block number for the configured commitment level */ +func (cb *claimerBlockchain) getCommitmentBlockNumber(ctx context.Context) (*big.Int, error) { var nr int64 - switch self.defaultBlock { + switch cb.defaultBlock { case model.DefaultBlock_Pending: nr = rpc.PendingBlockNumber.Int64() case model.DefaultBlock_Latest: @@ -308,10 +295,10 @@ func (self *claimerBlockchain) getBlockNumber(ctx context.Context) (*big.Int, er case model.DefaultBlock_Safe: nr = rpc.SafeBlockNumber.Int64() default: - return nil, fmt.Errorf("default block '%v' not supported", self.defaultBlock) + return nil, fmt.Errorf("default block '%v' not supported", cb.defaultBlock) } - hdr, err := self.client.HeaderByNumber(ctx, big.NewInt(nr)) + hdr, err := cb.client.HeaderByNumber(ctx, big.NewInt(nr)) if err != nil { return nil, err } diff --git a/internal/claimer/claimer.go b/internal/claimer/claimer.go index f852d448f..f2f16cf2a 100644 --- a/internal/claimer/claimer.go +++ b/internal/claimer/claimer.go @@ -51,9 +51,9 @@ import ( ) var ( - ErrClaimMismatch = fmt.Errorf("Claim and antecessor mismatch") - ErrEventMismatch = fmt.Errorf("Computed Claim mismatches ClaimSubmitted event") - ErrMissingEvent = fmt.Errorf("Accepted claim has no matching blockchain event") + ErrClaimMismatch = fmt.Errorf("constraints failed for epoch claim and its successor.") + ErrEventMismatch = fmt.Errorf("epoch claim does not match its corresponding event.") + ErrMissingEvent = fmt.Errorf("epoch claim does not have a corresponding event.") ) type iclaimerRepository interface { @@ -97,7 +97,7 @@ type iclaimerRepository interface { LoadNodeConfigRaw(ctx context.Context, key string) (rawJSON []byte, createdAt, updatedAt time.Time, err error) } -/* transition claims from computed to submitted */ +// transition epoch claims from computed to submitted. func (s *Service) submitClaimsAndUpdateDatabase( acceptedOrSubmittedEpochs map[int64]*model.Epoch, computedEpochs map[int64]*model.Epoch, @@ -128,12 +128,22 @@ func (s *Service) submitClaimsAndUpdateDatabase( computedEpoch.Index, receipt.TxHash, ) + + // NOTE: there is no point in trying the other claims on a database error + // so we just return and try again on the next tick if err != nil { errs = append(errs, err) return errs } + + // we expect apps[key] to always exist, + // but guard its use behind this `if` to ensure there is no panic if we are wrong. + appAddress := common.Address{} + if app, ok := apps[key]; ok { + appAddress = app.IApplicationAddress + } s.Logger.Info("Claim submitted", - "app", apps[key].IApplicationAddress, + "app", appAddress, "receipt_block_number", receipt.BlockNumber, "claim_hash", fmt.Sprintf("%x", computedEpoch.OutputsMerkleRoot), "last_block", computedEpoch.LastBlock, @@ -146,7 +156,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( delete(s.claimsInFlight, key) } - // check computed epochs +nextApp: // check computed epochs. NOTE: map mutation + iteration is safe in Go for key, currEpoch := range computedEpochs { var ic *iconsensus.IConsensus var prevClaimSubmissionEvent *iconsensus.IConsensusClaimSubmitted @@ -163,7 +173,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( if err := s.checkConsensusForAddressChange(app); err != nil { delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } if previousEpochExists { err := checkEpochSequenceConstraint(prevEpoch, currEpoch) @@ -172,7 +182,8 @@ func (s *Service) submitClaimsAndUpdateDatabase( s.Context, app.IApplicationAddress, prevEpoch.ApplicationID, - "database mismatch on epochs. application: %v, epochs: %v (%v), %v (%v).", + "%v. application: %v, epochs: %v (%v), %v (%v).", + err, app.IApplicationAddress, prevEpoch.Index, prevEpoch.VirtualIndex, @@ -181,7 +192,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( ) delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } // the previous epoch must have a matching claim submission event. @@ -191,7 +202,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( if err != nil { delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } if prevClaimSubmissionEvent == nil { err = s.setApplicationInoperable( @@ -205,7 +216,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( ) delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } if !claimSubmittedEventMatches(app, prevEpoch, prevClaimSubmissionEvent) { s.Logger.Error("event mismatch", @@ -217,15 +228,14 @@ func (s *Service) submitClaimsAndUpdateDatabase( s.Context, app.IApplicationAddress, app.ID, - "epoch has an invalid event: %v, epoch: %v (%v). event: %v", - currEpoch.Index, + "claim of epoch: %v (%v), does not match event with tx_hash: %v", prevEpoch.Index, prevEpoch.VirtualIndex, - prevClaimSubmissionEvent, + prevClaimSubmissionEvent.Raw.TxHash, ) delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } } else { // first claim @@ -234,7 +244,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( if err != nil { delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } } @@ -249,12 +259,14 @@ func (s *Service) submitClaimsAndUpdateDatabase( s.Context, app.IApplicationAddress, app.ID, - "computed claim does not match event. computed_claim=%v, current_event=%v", - currEpoch, currClaimSubmissionEvent, + "claim of epoch: %v (%v), does not match event with tx_hash: %v", + currEpoch.Index, + currEpoch.VirtualIndex, + currClaimSubmissionEvent.Raw.TxHash, ) delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } s.Logger.Debug("Updating claim status to submitted", "app", app.IApplicationAddress, @@ -271,7 +283,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( if err != nil { delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } delete(s.claimsInFlight, key) s.Logger.Info("Claim previously submitted", @@ -287,7 +299,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( "claim_hash", fmt.Sprintf("%x", prevEpoch.OutputsMerkleRoot), "last_block", prevEpoch.LastBlock, ) - goto nextApp + continue nextApp } s.Logger.Debug("Submitting claim to blockchain", "app", app.IApplicationAddress, @@ -298,7 +310,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( if err != nil { delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } s.claimsInFlight[key] = txHash } else { @@ -309,12 +321,11 @@ func (s *Service) submitClaimsAndUpdateDatabase( ) } - nextApp: } return errs } -/* transition claims from submitted to accepted */ +// transition claims from submitted to accepted func (s *Service) acceptClaimsAndUpdateDatabase( acceptedEpochs map[int64]*model.Epoch, submittedEpochs map[int64]*model.Epoch, @@ -324,7 +335,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( errs := []error{} var err error - // check submitted claims +nextApp: // check submitted epochs. NOTE: map mutation + iteration is safe in Go for key, submittedEpoch := range submittedEpochs { var prevEvent *iconsensus.IConsensusClaimAccepted var currEvent *iconsensus.IConsensusClaimAccepted @@ -335,50 +346,66 @@ func (s *Service) acceptClaimsAndUpdateDatabase( if err := s.checkConsensusForAddressChange(app); err != nil { delete(submittedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } if prevExists { err := checkEpochSequenceConstraint(acceptedEpoch, submittedEpoch) if err != nil { - s.Logger.Error("Database mismatch on epochs.", + s.Logger.Error("epoch sequence check failed.", "app", app.IApplicationAddress, "previous_epoch_index", acceptedEpoch.Index, "current_epoch_index", submittedEpoch.Index, "err", err, ) + err = s.setApplicationInoperable( + s.Context, + app.IApplicationAddress, + app.ID, + "%v. application: %v", + err, + app.IApplicationAddress, + ) delete(submittedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } - // if prevClaimRow exists, there must be a matching event + // if an epoch was accepted, there must exist a matching event for it _, prevEvent, currEvent, err = s.blockchain.findClaimAcceptedEventAndSucc(s.Context, app, acceptedEpoch, endBlock) if err != nil { delete(submittedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } if prevEvent == nil { - s.Logger.Error("Missing event", + s.Logger.Error("accepted epoch has no matching event", "app", app.IApplicationAddress, "claim", acceptedEpoch, "err", ErrMissingEvent, ) delete(submittedEpochs, key) errs = append(errs, ErrMissingEvent) - goto nextApp + continue nextApp } if !claimAcceptedEventMatches(app, acceptedEpoch, prevEvent) { - s.Logger.Error("Event mismatch", + s.Logger.Error("accepted epoch does not match event", "app", app.IApplicationAddress, "claim", acceptedEpoch, "event", prevEvent, "err", ErrEventMismatch, ) + err = s.setApplicationInoperable( + s.Context, + app.IApplicationAddress, + app.ID, + "epoch: %v does not match event with tx_hash: %v", + acceptedEpoch.Index, + prevEvent.Raw.TxHash, + ) delete(submittedEpochs, key) - errs = append(errs, ErrEventMismatch) - goto nextApp + errs = append(errs, err) + continue nextApp } } else { // first claim @@ -387,7 +414,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( if err != nil { delete(submittedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } } @@ -403,9 +430,17 @@ func (s *Service) acceptClaimsAndUpdateDatabase( "event", currEvent, "err", ErrEventMismatch, ) + err := s.setApplicationInoperable( + s.Context, + app.IApplicationAddress, + app.ID, + "event mismatch for epoch %v, event tx_hash: %v", + acceptedEpoch.Index, + prevEvent.Raw.TxHash, + ) delete(submittedEpochs, key) - errs = append(errs, ErrEventMismatch) - goto nextApp + errs = append(errs, err) + continue nextApp } s.Logger.Debug("Updating claim status to accepted", "app", app.IApplicationAddress, @@ -417,7 +452,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( if err != nil { delete(submittedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } s.Logger.Info("Claim accepted", "app", currEvent.AppContract, @@ -427,7 +462,6 @@ func (s *Service) acceptClaimsAndUpdateDatabase( "tx", txHash, ) } - nextApp: } return errs } @@ -443,18 +477,12 @@ func (s *Service) setApplicationInoperable( ) error { reason := fmt.Sprintf(reasonFmt, args...) appAddress := iApplicationAddress.String() - - // Log the reason first s.Logger.Error(reason, "application", appAddress) - - // Update application state err := s.repository.UpdateApplicationState(ctx, id, model.ApplicationState_Inoperable, &reason) if err != nil { s.Logger.Error("failed to update application state to inoperable", "app", appAddress, "err", err) } - - // Return the error with the reason - return errors.New(reason) + return errors.Join(errors.New(reason), err) } func (s *Service) checkConsensusForAddressChange( @@ -477,17 +505,25 @@ func (s *Service) checkConsensusForAddressChange( return nil } -func checkEpochConstraint(c *model.Epoch) error { - if c.FirstBlock > c.LastBlock { - return fmt.Errorf("unexpected epoch state. first_block: %v > last_block: %v", c.FirstBlock, c.LastBlock) +func checkEpochConstraint(epoch *model.Epoch) error { + if epoch.FirstBlock > epoch.LastBlock { + return fmt.Errorf("unexpected epoch state. first_block: %v > last_block: %v", + epoch.FirstBlock, epoch.LastBlock) } - if c.Status == model.EpochStatus_ClaimSubmitted { - if c.OutputsMerkleRoot == nil { - return fmt.Errorf("unexpected epoch state. missing claim_hash.") + + mustHaveOutputsMerkleRoot := epoch.Status == model.EpochStatus_ClaimSubmitted || + epoch.Status == model.EpochStatus_ClaimAccepted || + epoch.Status == model.EpochStatus_ClaimComputed + if mustHaveOutputsMerkleRoot { + if epoch.OutputsMerkleRoot == nil { + return fmt.Errorf("unexpected epoch state. missing outputs_merkle_root.") } } - if c.Status == model.EpochStatus_ClaimAccepted || c.Status == model.EpochStatus_ClaimSubmitted { - if c.ClaimTransactionHash == nil { + + mustHaveClaimTransactionHash := epoch.Status == model.EpochStatus_ClaimSubmitted || + epoch.Status == model.EpochStatus_ClaimAccepted + if mustHaveClaimTransactionHash { + if epoch.ClaimTransactionHash == nil { return fmt.Errorf("unexpected epoch state. missing claim_transaction_hash.") } } @@ -519,13 +555,21 @@ func checkEpochSequenceConstraint(prevEpoch *model.Epoch, currEpoch *model.Epoch } func claimSubmittedEventMatches(application *model.Application, epoch *model.Epoch, event *iconsensus.IConsensusClaimSubmitted) bool { + if application == nil || epoch == nil || event == nil { + return false + } return application.IApplicationAddress == event.AppContract && + epoch.OutputsMerkleRoot != nil && *epoch.OutputsMerkleRoot == event.OutputsMerkleRoot && epoch.LastBlock == event.LastProcessedBlockNumber.Uint64() } func claimAcceptedEventMatches(application *model.Application, epoch *model.Epoch, event *iconsensus.IConsensusClaimAccepted) bool { + if application == nil || epoch == nil || event == nil { + return false + } return application.IApplicationAddress == event.AppContract && + epoch.OutputsMerkleRoot != nil && *epoch.OutputsMerkleRoot == event.OutputsMerkleRoot && epoch.LastBlock == event.LastProcessedBlockNumber.Uint64() } diff --git a/internal/claimer/claimer_test.go b/internal/claimer/claimer_test.go index 620650711..225e22165 100644 --- a/internal/claimer/claimer_test.go +++ b/internal/claimer/claimer_test.go @@ -15,6 +15,7 @@ import ( "github.com/cartesi/rollups-node/internal/model" "github.com/cartesi/rollups-node/pkg/contracts/iconsensus" "github.com/cartesi/rollups-node/pkg/service" + "github.com/davecgh/go-spew/spew" "github.com/lmittmann/tint" "github.com/ethereum/go-ethereum/common" @@ -158,7 +159,7 @@ func (m *claimerBlockchainMock) pollTransaction( args.Get(1).(*types.Receipt), args.Error(2) } -func (m *claimerBlockchainMock) getBlockNumber(ctx context.Context) (*big.Int, error) { +func (m *claimerBlockchainMock) getCommitmentBlockNumber(ctx context.Context) (*big.Int, error) { args := m.Called(ctx) return args.Get(0).(*big.Int), args.Error(1) @@ -570,7 +571,7 @@ func TestSubmitClaimWithAntecessorMismatch(t *testing.T) { b.On("findClaimSubmittedEventAndSucc", app, prevEpoch, endBlock). Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil). Once() - r.On("UpdateApplicationState", nil, int64(0), model.ApplicationState_Inoperable, mock.Anything). + r.On("UpdateApplicationState", mock.Anything, int64(0), model.ApplicationState_Inoperable, mock.Anything). Return(nil). Once() @@ -595,7 +596,7 @@ func TestSubmitClaimWithEventMismatch(t *testing.T) { Return(app.IConsensusAddress, nil).Once() b.On("findClaimSubmittedEventAndSucc", app, prevEpoch, endBlock). Return(&iconsensus.IConsensus{}, prevEvent, wrongEvent, nil) - r.On("UpdateApplicationState", nil, int64(0), model.ApplicationState_Inoperable, mock.Anything). + r.On("UpdateApplicationState", mock.Anything, int64(0), model.ApplicationState_Inoperable, mock.Anything). Return(nil) errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) @@ -614,7 +615,7 @@ func TestSubmitClaimWithAntecessorOutOfOrder(t *testing.T) { b.On("getConsensusAddress", mock.Anything, app). Return(app.IConsensusAddress, nil).Once() - r.On("UpdateApplicationState", nil, int64(0), model.ApplicationState_Inoperable, mock.Anything). + r.On("UpdateApplicationState", mock.Anything, int64(0), model.ApplicationState_Inoperable, mock.Anything). Return(nil) errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), big.NewInt(0)) @@ -637,7 +638,7 @@ func TestErrSubmittedMissingEvent(t *testing.T) { Return(app.IConsensusAddress, nil).Once() b.On("findClaimSubmittedEventAndSucc", app, prevEpoch, endBlock). Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil).Once() - r.On("UpdateApplicationState", nil, int64(0), model.ApplicationState_Inoperable, mock.Anything). + r.On("UpdateApplicationState", mock.Anything, int64(0), model.ApplicationState_Inoperable, mock.Anything). Return(nil) errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) @@ -658,7 +659,7 @@ func TestConsensusAddressChangedOnSubmittedClaims(t *testing.T) { b.On("getConsensusAddress", mock.Anything, app). Return(wrongConsensusAddress, nil). Once() - r.On("UpdateApplicationState", nil, int64(0), model.ApplicationState_Inoperable, mock.Anything). + r.On("UpdateApplicationState", mock.Anything, int64(0), model.ApplicationState_Inoperable, mock.Anything). Return(nil). Once() @@ -735,6 +736,9 @@ func TestAcceptClaimWithAntecessorMismatch(t *testing.T) { Return(app.IConsensusAddress, nil).Once() b.On("findClaimAcceptedEventAndSucc", mock.Anything, app, prevEpoch, endBlock). Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil) + r.On("UpdateApplicationState", mock.Anything, mock.Anything, model.ApplicationState_Inoperable, mock.Anything). + Return(nil). + Once() errs := m.acceptClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 1, len(errs)) @@ -758,8 +762,12 @@ func TestAcceptClaimWithEventMismatch(t *testing.T) { Return(app.IConsensusAddress, nil).Once() b.On("findClaimAcceptedEventAndSucc", mock.Anything, app, prevEpoch, endBlock). Return(&iconsensus.IConsensus{}, prevEvent, wrongEvent, nil) + r.On("UpdateApplicationState", mock.Anything, mock.Anything, model.ApplicationState_Inoperable, mock.Anything). + Return(nil). + Once() errs := m.acceptClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + spew.Dump(errs) assert.Equal(t, 1, len(errs)) } @@ -775,6 +783,10 @@ func TestAcceptClaimWithAntecessorOutOfOrder(t *testing.T) { b.On("getConsensusAddress", mock.Anything, app). Return(app.IConsensusAddress, nil).Once() + r.On("UpdateApplicationState", mock.Anything, mock.Anything, model.ApplicationState_Inoperable, mock.Anything). + Return(nil). + Once() + errs := m.acceptClaimsAndUpdateDatabase(makeEpochMap(wrongEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), big.NewInt(0)) assert.Equal(t, 1, len(errs)) @@ -840,7 +852,7 @@ func TestConsensusAddressChangedOnAcceptedClaims(t *testing.T) { b.On("getConsensusAddress", mock.Anything, app). Return(wrongConsensusAddress, nil). Once() - r.On("UpdateApplicationState", nil, int64(0), model.ApplicationState_Inoperable, mock.Anything). + r.On("UpdateApplicationState", mock.Anything, int64(0), model.ApplicationState_Inoperable, mock.Anything). Return(nil). Once() diff --git a/internal/claimer/service.go b/internal/claimer/service.go index 0efa3c6f4..a1949f8eb 100644 --- a/internal/claimer/service.go +++ b/internal/claimer/service.go @@ -12,7 +12,7 @@ import ( "github.com/cartesi/rollups-node/internal/config" "github.com/cartesi/rollups-node/internal/config/auth" - . "github.com/cartesi/rollups-node/internal/model" + "github.com/cartesi/rollups-node/internal/model" "github.com/cartesi/rollups-node/internal/repository" "github.com/cartesi/rollups-node/pkg/ethutil" "github.com/cartesi/rollups-node/pkg/service" @@ -34,22 +34,31 @@ type CreateInfo struct { type Service struct { service.Service - repository iclaimerRepository - blockchain iclaimerBlockchain - claimsInFlight map[int64]common.Hash // application.ID -> txHash + repository iclaimerRepository + blockchain iclaimerBlockchain + + // submitted claims waiting for confirmation from the blockchain. + // only accessed from tick, so no need for a lock + // contains: application ID -> transaction hash, with a maximum of one + // key per application due to the epoch advancement logic. + claimsInFlight map[int64]common.Hash submissionEnabled bool } const ClaimerConfigKey = "claimer" type PersistentConfig struct { - DefaultBlock DefaultBlock + DefaultBlock model.DefaultBlock ClaimSubmissionEnabled bool ChainID uint64 } func Create(ctx context.Context, c *CreateInfo) (*Service, error) { var err error + + if c == nil { + return nil, errors.New("invalid CreateInfo is nil") + } if err = ctx.Err(); err != nil { return nil, err // This returns context.Canceled or context.DeadlineExceeded. } @@ -129,6 +138,7 @@ func (s *Service) Stop(bool) []error { return nil } +// NOTE: tick is not re-entrant! func (s *Service) Tick() []error { errs := []error{} @@ -157,16 +167,15 @@ func (s *Service) Tick() []error { return nil } - // we have claims to check - // get the latest/safe/finalized, etc. block - endBlock, err := s.blockchain.getBlockNumber(s.Context) + // we have claims to check. Get the latest/safe/finalized, etc. block + commitmentBlockNumber, err := s.blockchain.getCommitmentBlockNumber(s.Context) if err != nil { errs = append(errs, err) return errs } - errs = append(errs, s.submitClaimsAndUpdateDatabase(acceptedOrSubmittedEpochs, computedEpochs, computedApps, endBlock)...) - errs = append(errs, s.acceptClaimsAndUpdateDatabase(acceptedEpochs, submittedEpochs, submittedApps, endBlock)...) + errs = append(errs, s.submitClaimsAndUpdateDatabase(acceptedOrSubmittedEpochs, computedEpochs, computedApps, commitmentBlockNumber)...) + errs = append(errs, s.acceptClaimsAndUpdateDatabase(acceptedEpochs, submittedEpochs, submittedApps, commitmentBlockNumber)...) return errs } @@ -178,7 +187,7 @@ func setupPersistentConfig( ) (*PersistentConfig, error) { config, err := repository.LoadNodeConfig[PersistentConfig](ctx, repo, ClaimerConfigKey) if config == nil && errors.Is(err, repository.ErrNotFound) { - nc := NodeConfig[PersistentConfig]{ + nc := model.NodeConfig[PersistentConfig]{ Key: ClaimerConfigKey, Value: PersistentConfig{ DefaultBlock: c.BlockchainDefaultBlock, diff --git a/pkg/contracts/iconsensus/iconsensus.go b/pkg/contracts/iconsensus/iconsensus.go index 3b92ebe5b..d0071cdd2 100644 --- a/pkg/contracts/iconsensus/iconsensus.go +++ b/pkg/contracts/iconsensus/iconsensus.go @@ -31,7 +31,7 @@ var ( // IConsensusMetaData contains all meta data concerning the IConsensus contract. var IConsensusMetaData = &bind.MetaData{ - ABI: "[{\"type\":\"function\",\"name\":\"getEpochLength\",\"inputs\":[],\"outputs\":[{\"name\":\"\",\"type\":\"uint256\",\"internalType\":\"uint256\"}],\"stateMutability\":\"view\"},{\"type\":\"function\",\"name\":\"getNumberOfAcceptedClaims\",\"inputs\":[],\"outputs\":[{\"name\":\"\",\"type\":\"uint256\",\"internalType\":\"uint256\"}],\"stateMutability\":\"view\"},{\"type\":\"function\",\"name\":\"isOutputsMerkleRootValid\",\"inputs\":[{\"name\":\"appContract\",\"type\":\"address\",\"internalType\":\"address\"},{\"name\":\"outputsMerkleRoot\",\"type\":\"bytes32\",\"internalType\":\"bytes32\"}],\"outputs\":[{\"name\":\"\",\"type\":\"bool\",\"internalType\":\"bool\"}],\"stateMutability\":\"view\"},{\"type\":\"function\",\"name\":\"submitClaim\",\"inputs\":[{\"name\":\"appContract\",\"type\":\"address\",\"internalType\":\"address\"},{\"name\":\"lastProcessedBlockNumber\",\"type\":\"uint256\",\"internalType\":\"uint256\"},{\"name\":\"outputsMerkleRoot\",\"type\":\"bytes32\",\"internalType\":\"bytes32\"}],\"outputs\":[],\"stateMutability\":\"nonpayable\"},{\"type\":\"function\",\"name\":\"supportsInterface\",\"inputs\":[{\"name\":\"interfaceId\",\"type\":\"bytes4\",\"internalType\":\"bytes4\"}],\"outputs\":[{\"name\":\"\",\"type\":\"bool\",\"internalType\":\"bool\"}],\"stateMutability\":\"view\"},{\"type\":\"event\",\"name\":\"ClaimAccepted\",\"inputs\":[{\"name\":\"appContract\",\"type\":\"address\",\"indexed\":true,\"internalType\":\"address\"},{\"name\":\"lastProcessedBlockNumber\",\"type\":\"uint256\",\"indexed\":false,\"internalType\":\"uint256\"},{\"name\":\"outputsMerkleRoot\",\"type\":\"bytes32\",\"indexed\":false,\"internalType\":\"bytes32\"}],\"anonymous\":false},{\"type\":\"event\",\"name\":\"ClaimSubmitted\",\"inputs\":[{\"name\":\"submitter\",\"type\":\"address\",\"indexed\":true,\"internalType\":\"address\"},{\"name\":\"appContract\",\"type\":\"address\",\"indexed\":true,\"internalType\":\"address\"},{\"name\":\"lastProcessedBlockNumber\",\"type\":\"uint256\",\"indexed\":false,\"internalType\":\"uint256\"},{\"name\":\"outputsMerkleRoot\",\"type\":\"bytes32\",\"indexed\":false,\"internalType\":\"bytes32\"}],\"anonymous\":false},{\"type\":\"error\",\"name\":\"NotEpochFinalBlock\",\"inputs\":[{\"name\":\"lastProcessedBlockNumber\",\"type\":\"uint256\",\"internalType\":\"uint256\"},{\"name\":\"epochLength\",\"type\":\"uint256\",\"internalType\":\"uint256\"}]},{\"type\":\"error\",\"name\":\"NotFirstClaim\",\"inputs\":[{\"name\":\"appContract\",\"type\":\"address\",\"internalType\":\"address\"},{\"name\":\"lastProcessedBlockNumber\",\"type\":\"uint256\",\"internalType\":\"uint256\"}]},{\"type\":\"error\",\"name\":\"NotPastBlock\",\"inputs\":[{\"name\":\"lastProcessedBlockNumber\",\"type\":\"uint256\",\"internalType\":\"uint256\"},{\"name\":\"currentBlockNumber\",\"type\":\"uint256\",\"internalType\":\"uint256\"}]}]", + ABI: "[{\"type\":\"function\",\"name\":\"getEpochLength\",\"inputs\":[],\"outputs\":[{\"name\":\"\",\"type\":\"uint256\",\"internalType\":\"uint256\"}],\"stateMutability\":\"view\"},{\"type\":\"function\",\"name\":\"getNumberOfAcceptedClaims\",\"inputs\":[],\"outputs\":[{\"name\":\"\",\"type\":\"uint256\",\"internalType\":\"uint256\"}],\"stateMutability\":\"view\"},{\"type\":\"function\",\"name\":\"getNumberOfSubmittedClaims\",\"inputs\":[],\"outputs\":[{\"name\":\"\",\"type\":\"uint256\",\"internalType\":\"uint256\"}],\"stateMutability\":\"view\"},{\"type\":\"function\",\"name\":\"isOutputsMerkleRootValid\",\"inputs\":[{\"name\":\"appContract\",\"type\":\"address\",\"internalType\":\"address\"},{\"name\":\"outputsMerkleRoot\",\"type\":\"bytes32\",\"internalType\":\"bytes32\"}],\"outputs\":[{\"name\":\"\",\"type\":\"bool\",\"internalType\":\"bool\"}],\"stateMutability\":\"view\"},{\"type\":\"function\",\"name\":\"submitClaim\",\"inputs\":[{\"name\":\"appContract\",\"type\":\"address\",\"internalType\":\"address\"},{\"name\":\"lastProcessedBlockNumber\",\"type\":\"uint256\",\"internalType\":\"uint256\"},{\"name\":\"outputsMerkleRoot\",\"type\":\"bytes32\",\"internalType\":\"bytes32\"}],\"outputs\":[],\"stateMutability\":\"nonpayable\"},{\"type\":\"function\",\"name\":\"supportsInterface\",\"inputs\":[{\"name\":\"interfaceId\",\"type\":\"bytes4\",\"internalType\":\"bytes4\"}],\"outputs\":[{\"name\":\"\",\"type\":\"bool\",\"internalType\":\"bool\"}],\"stateMutability\":\"view\"},{\"type\":\"event\",\"name\":\"ClaimAccepted\",\"inputs\":[{\"name\":\"appContract\",\"type\":\"address\",\"indexed\":true,\"internalType\":\"address\"},{\"name\":\"lastProcessedBlockNumber\",\"type\":\"uint256\",\"indexed\":false,\"internalType\":\"uint256\"},{\"name\":\"outputsMerkleRoot\",\"type\":\"bytes32\",\"indexed\":false,\"internalType\":\"bytes32\"}],\"anonymous\":false},{\"type\":\"event\",\"name\":\"ClaimSubmitted\",\"inputs\":[{\"name\":\"submitter\",\"type\":\"address\",\"indexed\":true,\"internalType\":\"address\"},{\"name\":\"appContract\",\"type\":\"address\",\"indexed\":true,\"internalType\":\"address\"},{\"name\":\"lastProcessedBlockNumber\",\"type\":\"uint256\",\"indexed\":false,\"internalType\":\"uint256\"},{\"name\":\"outputsMerkleRoot\",\"type\":\"bytes32\",\"indexed\":false,\"internalType\":\"bytes32\"}],\"anonymous\":false},{\"type\":\"error\",\"name\":\"NotEpochFinalBlock\",\"inputs\":[{\"name\":\"lastProcessedBlockNumber\",\"type\":\"uint256\",\"internalType\":\"uint256\"},{\"name\":\"epochLength\",\"type\":\"uint256\",\"internalType\":\"uint256\"}]},{\"type\":\"error\",\"name\":\"NotFirstClaim\",\"inputs\":[{\"name\":\"appContract\",\"type\":\"address\",\"internalType\":\"address\"},{\"name\":\"lastProcessedBlockNumber\",\"type\":\"uint256\",\"internalType\":\"uint256\"}]},{\"type\":\"error\",\"name\":\"NotPastBlock\",\"inputs\":[{\"name\":\"lastProcessedBlockNumber\",\"type\":\"uint256\",\"internalType\":\"uint256\"},{\"name\":\"currentBlockNumber\",\"type\":\"uint256\",\"internalType\":\"uint256\"}]}]", } // IConsensusABI is the input ABI used to generate the binding from. @@ -242,6 +242,37 @@ func (_IConsensus *IConsensusCallerSession) GetNumberOfAcceptedClaims() (*big.In return _IConsensus.Contract.GetNumberOfAcceptedClaims(&_IConsensus.CallOpts) } +// GetNumberOfSubmittedClaims is a free data retrieval call binding the contract method 0xee5e0faa. +// +// Solidity: function getNumberOfSubmittedClaims() view returns(uint256) +func (_IConsensus *IConsensusCaller) GetNumberOfSubmittedClaims(opts *bind.CallOpts) (*big.Int, error) { + var out []interface{} + err := _IConsensus.contract.Call(opts, &out, "getNumberOfSubmittedClaims") + + if err != nil { + return *new(*big.Int), err + } + + out0 := *abi.ConvertType(out[0], new(*big.Int)).(**big.Int) + + return out0, err + +} + +// GetNumberOfSubmittedClaims is a free data retrieval call binding the contract method 0xee5e0faa. +// +// Solidity: function getNumberOfSubmittedClaims() view returns(uint256) +func (_IConsensus *IConsensusSession) GetNumberOfSubmittedClaims() (*big.Int, error) { + return _IConsensus.Contract.GetNumberOfSubmittedClaims(&_IConsensus.CallOpts) +} + +// GetNumberOfSubmittedClaims is a free data retrieval call binding the contract method 0xee5e0faa. +// +// Solidity: function getNumberOfSubmittedClaims() view returns(uint256) +func (_IConsensus *IConsensusCallerSession) GetNumberOfSubmittedClaims() (*big.Int, error) { + return _IConsensus.Contract.GetNumberOfSubmittedClaims(&_IConsensus.CallOpts) +} + // IsOutputsMerkleRootValid is a free data retrieval call binding the contract method 0xe5cc8664. // // Solidity: function isOutputsMerkleRootValid(address appContract, bytes32 outputsMerkleRoot) view returns(bool)