Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ TARGET_OS?=$(shell uname)
export TARGET_OS

ROLLUPS_NODE_VERSION := 2.0.0-alpha.9
ROLLUPS_CONTRACTS_VERSION := 2.1.1
ROLLUPS_CONTRACTS_VERSION := 2.2.0
ROLLUPS_CONTRACTS_URL:=https://github.com/cartesi/rollups-contracts/releases/download/
ROLLUPS_CONTRACTS_ARTIFACT:=rollups-contracts-$(ROLLUPS_CONTRACTS_VERSION)-artifacts.tar.gz
ROLLUPS_CONTRACTS_SHA256:=2e7a105d656de2adafad6439a5ff00f35b997aaf27972bd1becc33dea8817861
ROLLUPS_CONTRACTS_SHA256:=31c20a8c50f794185957ebd6e554fc99c8e01f0fdf9a80628d031fb0edc7091d
ROLLUPS_PRT_CONTRACTS_VERSION := 2.1.0
ROLLUPS_PRT_CONTRACTS_URL:=https://github.com/cartesi/dave/releases/download/
ROLLUPS_PRT_CONTRACTS_ARTIFACT:=cartesi-rollups-prt-$(ROLLUPS_PRT_CONTRACTS_VERSION)-contract-artifacts.tar.gz
Expand Down
233 changes: 111 additions & 122 deletions internal/claimer/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package claimer
import (
"context"
"fmt"
"iter"
"log/slog"
"math/big"

Expand All @@ -15,8 +14,6 @@ import (
"github.com/cartesi/rollups-node/pkg/contracts/iconsensus"
"github.com/cartesi/rollups-node/pkg/ethutil"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -77,24 +74,24 @@ type claimerBlockchain struct {
defaultBlock config.DefaultBlock
}

func (self *claimerBlockchain) submitClaimToBlockchain(
func (cb *claimerBlockchain) submitClaimToBlockchain(
ic *iconsensus.IConsensus,
application *model.Application,
epoch *model.Epoch,
) (common.Hash, error) {
txHash := common.Hash{}
lastBlockNumber := new(big.Int).SetUint64(epoch.LastBlock)
tx, err := ic.SubmitClaim(self.txOpts, application.IApplicationAddress,
tx, err := ic.SubmitClaim(cb.txOpts, application.IApplicationAddress,
lastBlockNumber, *epoch.OutputsMerkleRoot)
if err != nil {
self.logger.Error("submitClaimToBlockchain:failed",
cb.logger.Error("submitClaimToBlockchain:failed",
"appContractAddress", application.IApplicationAddress,
"claimHash", *epoch.OutputsMerkleRoot,
"last_block", epoch.LastBlock,
"error", err)
} else {
txHash = tx.Hash()
self.logger.Debug("submitClaimToBlockchain:success",
cb.logger.Debug("submitClaimToBlockchain:success",
"appContractAddress", application.IApplicationAddress,
"claimHash", *epoch.OutputsMerkleRoot,
"last_block", epoch.LastBlock,
Expand All @@ -103,25 +100,59 @@ func (self *claimerBlockchain) submitClaimToBlockchain(
return txHash, err
}

func unwrapClaimSubmitted(
ic *iconsensus.IConsensus,
pull func() (log *types.Log, err error, ok bool),
type EventIterator interface {
Next() bool
Close() error
}

func newOracle(
nr func(*bind.CallOpts) (*big.Int, error),
) (
*iconsensus.IConsensusClaimSubmitted,
bool,
error,
func(ctx context.Context, block uint64) (*big.Int, error),
) {
log, err, ok := pull()
if !ok || err != nil {
return nil, false, err
return func(ctx context.Context, block uint64) (*big.Int, error) {
callOpts := &bind.CallOpts{
Context: ctx,
BlockNumber: new(big.Int).SetUint64(block),
}
numEvents, err := nr(callOpts)

if err != nil {
return nil, fmt.Errorf("failed to get event count in block %d: %w", block, err)
}
return numEvents, nil
}
}

func newOnHit[IT EventIterator](
ctx context.Context,
address common.Address,
filter func (*bind.FilterOpts, []common.Address, []common.Address) (IT, error),
onEvent func(IT),
) (
func(block uint64) error,
) {
return func(block uint64) error {
filterOpts := &bind.FilterOpts{
Context: ctx,
Start: block,
End: &block,
}
it, err := filter(filterOpts, nil, []common.Address{address})
if err != nil {
return fmt.Errorf("failed to retrieve events at block %d: %w", block, err)
}
defer it.Close()
for it.Next() {
onEvent(it)
}
return nil
}
ev, err := ic.ParseClaimSubmitted(*log)
return ev, true, err
}

// scan the event stream for a claimSubmitted event that matches claim.
// return this event and its successor
func (self *claimerBlockchain) findClaimSubmittedEventAndSucc(
func (cb *claimerBlockchain) findClaimSubmittedEventAndSucc(
ctx context.Context,
application *model.Application,
epoch *model.Epoch,
Expand All @@ -132,78 +163,44 @@ func (self *claimerBlockchain) findClaimSubmittedEventAndSucc(
*iconsensus.IConsensusClaimSubmitted,
error,
) {
ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, self.client)
ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, cb.client)
if err != nil {
return nil, nil, nil, err
}

// filter must match:
// - `ClaimSubmitted` events
// - submitter == nil (any)
// - appContract == claim.IApplicationAddress
c, err := iconsensus.IConsensusMetaData.GetAbi()
topics, err := abi.MakeTopics(
[]any{c.Events[model.MonitoredEvent_ClaimSubmitted.String()].ID},
nil,
[]any{application.IApplicationAddress},
oracle := newOracle(ic.GetNumberOfSubmittedClaims)
events := []*iconsensus.IConsensusClaimSubmitted{}
onHit := newOnHit(ctx, application.IApplicationAddress, ic.FilterClaimSubmitted,
func(it *iconsensus.IConsensusClaimSubmittedIterator) {
event := it.Event
if (len(events) == 0) && claimSubmittedEventMatches(application, epoch, event) {
events = append(events, event)
} else if len(events) != 0 {
events = append(events, event)
}
},
)
if err != nil {
return nil, nil, nil, err
}

it, err := self.filter.ChunkedFilterLogs(ctx, self.client, ethereum.FilterQuery{
FromBlock: new(big.Int).SetUint64(epoch.LastBlock),
ToBlock: endBlock,
Addresses: []common.Address{application.IConsensusAddress},
Topics: topics,
})
numSubmittedClaims, err := oracle(ctx, epoch.LastBlock)
if err != nil {
return nil, nil, nil, err
}

// pull events instead of iterating
next, stop := iter.Pull2(it)
defer stop()
for {
event, ok, err := unwrapClaimSubmitted(ic, next)
if !ok || err != nil {
return ic, event, nil, err
}
lastBlock := event.LastProcessedBlockNumber.Uint64()

if claimSubmittedEventMatches(application, epoch, event) {
// found the event, does it has a successor? try to fetch it
succ, ok, err := unwrapClaimSubmitted(ic, next)
if !ok || err != nil {
return ic, event, nil, err
}
return ic, event, succ, err
} else if lastBlock > epoch.LastBlock {
err = fmt.Errorf("No matching claim, searched up to %v", event)
return nil, nil, nil, err
}
_, err = ethutil.FindTransitions(ctx, epoch.LastBlock, endBlock.Uint64(), numSubmittedClaims, oracle, onHit)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to walk ClaimSubmitted transitions: %w", err)
}
}

func unwrapClaimAccepted(
ic *iconsensus.IConsensus,
pull func() (log *types.Log, err error, ok bool),
) (
*iconsensus.IConsensusClaimAccepted,
bool,
error,
) {
log, err, ok := pull()
if !ok || err != nil {
return nil, false, err
if len(events) == 0 {
return ic, nil, nil, nil
} else if len(events) == 1 {
return ic, events[0], nil, nil
} else {
return ic, events[0], events[1], nil
}
ev, err := ic.ParseClaimAccepted(*log)
return ev, true, err
}

// scan the event stream for a claimAccepted event that matches claim.
// return this event and its successor
func (self *claimerBlockchain) findClaimAcceptedEventAndSucc(
func (cb *claimerBlockchain) findClaimAcceptedEventAndSucc(
ctx context.Context,
application *model.Application,
epoch *model.Epoch,
Expand All @@ -214,76 +211,68 @@ func (self *claimerBlockchain) findClaimAcceptedEventAndSucc(
*iconsensus.IConsensusClaimAccepted,
error,
) {
ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, self.client)
ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, cb.client)
if err != nil {
return nil, nil, nil, err
}

// filter must match:
// - `ClaimAccepted` events
// - appContract == claim.IApplicationAddress
c, err := iconsensus.IConsensusMetaData.GetAbi()
topics, err := abi.MakeTopics(
[]any{c.Events[model.MonitoredEvent_ClaimAccepted.String()].ID},
[]any{application.IApplicationAddress},
oracle := newOracle(ic.GetNumberOfAcceptedClaims)
events := []*iconsensus.IConsensusClaimAccepted{}
filter := func(
opts *bind.FilterOpts,
_ []common.Address,
appContract []common.Address,
) (*iconsensus.IConsensusClaimAcceptedIterator, error) {
return ic.FilterClaimAccepted(opts, appContract)
}
onHit := newOnHit(ctx, application.IApplicationAddress, filter,
func(it *iconsensus.IConsensusClaimAcceptedIterator) {
event := it.Event
if (len(events) == 0) && claimAcceptedEventMatches(application, epoch, event) {
events = append(events, event)
} else if len(events) != 0 {
events = append(events, event)
}
},
)

numAcceptedClaims, err := oracle(ctx, epoch.LastBlock)
if err != nil {
return nil, nil, nil, err
}

it, err := self.filter.ChunkedFilterLogs(ctx, self.client, ethereum.FilterQuery{
FromBlock: new(big.Int).SetUint64(epoch.LastBlock),
ToBlock: endBlock,
Addresses: []common.Address{application.IConsensusAddress},
Topics: topics,
})
_, err = ethutil.FindTransitions(ctx, epoch.LastBlock, endBlock.Uint64(), numAcceptedClaims, oracle, onHit)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, fmt.Errorf("failed to walk ClaimAccepted transitions: %w", err)
}

// pull events instead of iterating
next, stop := iter.Pull2(it)
defer stop()
for {
event, ok, err := unwrapClaimAccepted(ic, next)
if !ok || err != nil {
return ic, event, nil, err
}
lastBlock := event.LastProcessedBlockNumber.Uint64()

if claimAcceptedEventMatches(application, epoch, event) {
// found the event, does it has a successor? try to fetch it
succ, ok, err := unwrapClaimAccepted(ic, next)
if !ok || err != nil {
return ic, event, nil, err
}
return ic, event, succ, err
} else if lastBlock > epoch.LastBlock {
err = fmt.Errorf("No matching claim, searched up to %v", event)
return nil, nil, nil, err
}
if len(events) == 0 {
return ic, nil, nil, nil
} else if len(events) == 1 {
return ic, events[0], nil, nil
} else {
return ic, events[0], events[1], nil
}
}

func (self *claimerBlockchain) getConsensusAddress(
func (cb *claimerBlockchain) getConsensusAddress(
ctx context.Context,
app *model.Application,
) (common.Address, error) {
return ethutil.GetConsensus(ctx, self.client, app.IApplicationAddress)
return ethutil.GetConsensus(ctx, cb.client, app.IApplicationAddress)
}

/* poll a transaction hash for its submission status and receipt */
func (self *claimerBlockchain) pollTransaction(
func (cb *claimerBlockchain) pollTransaction(
ctx context.Context,
txHash common.Hash,
endBlock *big.Int,
) (bool, *types.Receipt, error) {
_, isPending, err := self.client.TransactionByHash(ctx, txHash)
_, isPending, err := cb.client.TransactionByHash(ctx, txHash)
if err != nil || isPending {
return false, nil, err
}

receipt, err := self.client.TransactionReceipt(ctx, txHash)
receipt, err := cb.client.TransactionReceipt(ctx, txHash)
if err != nil {
return false, nil, err
}
Expand All @@ -296,9 +285,9 @@ func (self *claimerBlockchain) pollTransaction(
}

/* Retrieve the block number of "DefaultBlock" */
func (self *claimerBlockchain) getBlockNumber(ctx context.Context) (*big.Int, error) {
func (cb *claimerBlockchain) getBlockNumber(ctx context.Context) (*big.Int, error) {
var nr int64
switch self.defaultBlock {
switch cb.defaultBlock {
case model.DefaultBlock_Pending:
nr = rpc.PendingBlockNumber.Int64()
case model.DefaultBlock_Latest:
Expand All @@ -308,10 +297,10 @@ func (self *claimerBlockchain) getBlockNumber(ctx context.Context) (*big.Int, er
case model.DefaultBlock_Safe:
nr = rpc.SafeBlockNumber.Int64()
default:
return nil, fmt.Errorf("default block '%v' not supported", self.defaultBlock)
return nil, fmt.Errorf("default block '%v' not supported", cb.defaultBlock)
}

hdr, err := self.client.HeaderByNumber(ctx, big.NewInt(nr))
hdr, err := cb.client.HeaderByNumber(ctx, big.NewInt(nr))
if err != nil {
return nil, err
}
Expand Down
Loading
Loading