Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
28 changes: 22 additions & 6 deletions packages/testing/src/execution_testing/cli/evm_bytes.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ class OpcodeWithOperands:

opcode: Op
operands: List[int] = field(default_factory=list)
args: List["OpcodeWithOperands | HexNumber"] = field(default_factory=list)
kwargs: Dict[str, "OpcodeWithOperands | HexNumber"] = field(
args: List["OpcodeWithOperands | HexNumber | str"] = field(
default_factory=list
)
kwargs: Dict[str, "OpcodeWithOperands | HexNumber | str"] = field(
default_factory=dict
)

Expand All @@ -49,7 +51,9 @@ def __str__(self) -> str:
output = f"{output}({', '.join(args)})"
return output

def opcode_or_int(self) -> "OpcodeWithOperands | HexNumber":
def opcode_or_int(
self, int_definitions: dict[int, str] | None = None
) -> "OpcodeWithOperands | HexNumber | str":
"""
Return self or an HexNumber if the opcode is a PUSH opcode and can be
seamlessly converted to int when used as a stack argument or keyword
Expand All @@ -69,6 +73,8 @@ def opcode_or_int(self) -> "OpcodeWithOperands | HexNumber":
value = self.operands[0]
min_bytes = max(1, (value.bit_length() + 7) // 8)
if self.opcode.data_portion_length == min_bytes:
if int_definitions and value in int_definitions:
return int_definitions[value]
return HexNumber(value)
return self

Expand Down Expand Up @@ -103,7 +109,9 @@ def __str__(self) -> str:


def process_evm_bytes( # noqa: D103
evm_bytes: bytes, assembly: bool
evm_bytes: bytes,
assembly: bool = False,
int_definitions: dict[int, str] | None = None,
) -> List[OpcodeWithOperands]:
evm_bytes_array = bytearray(evm_bytes)

Expand Down Expand Up @@ -143,7 +151,10 @@ def process_evm_bytes( # noqa: D103
reversed(opcodes[-opcode.popped_stack_items :])
)
if all(arg.opcode.pushed_stack_items == 1 for arg in args):
args_with_int = [arg.opcode_or_int() for arg in args]
args_with_int = [
arg.opcode_or_int(int_definitions=int_definitions)
for arg in args
]
opcodes = opcodes[: -opcode.popped_stack_items]
if opcode.kwargs and len(opcode.kwargs) == len(args_with_int):
opcode_with_operands.kwargs = dict(
Expand Down Expand Up @@ -218,14 +229,19 @@ def process_evm_bytes_string(
evm_bytes_hex_string: str,
assembly: bool = False,
skip_simplify: bool = False,
int_definitions: dict[int, str] | None = None,
) -> str:
"""Process the given EVM bytes hex string."""
if evm_bytes_hex_string.startswith("0x"):
evm_bytes_hex_string = evm_bytes_hex_string[2:]

evm_bytes = bytes.fromhex(evm_bytes_hex_string)
return format_opcodes(
process_evm_bytes(evm_bytes, assembly=assembly),
process_evm_bytes(
evm_bytes,
assembly=assembly,
int_definitions=int_definitions,
),
assembly=assembly,
skip_simplify=skip_simplify,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,3 +391,100 @@ def has_index(self, d: int, g: int, v: int) -> bool:
v_match = True if self.indexes.value.count(v) else False

return d_match and g_match and v_match


def _match_index(idx: int | list, val: int) -> bool:
"""Check if an index specification matches a value."""
if isinstance(idx, int):
return idx == -1 or idx == val
if isinstance(idx, list):
return val in idx
return False


def resolve_expect_post(
expect_entries: list[dict],
d: int,
g: int,
v: int,
fork: Fork,
) -> tuple[dict, TransactionExceptionInstanceOrList | None]:
"""
Resolve expected post-state for given d, g, v and fork.

Used by generated Python tests at runtime. The expect_entries are
materialized Python dicts with resolved addresses and Account objects.
"""
for entry in expect_entries:
indexes = entry["indexes"]
if not _match_index(indexes.get("data", -1), d):
continue
if not _match_index(indexes.get("gas", -1), g):
continue
if not _match_index(indexes.get("value", -1), v):
continue

# Match fork against network constraints
network = entry["network"]
fork_set = ForkSet.model_validate(network)
if fork not in fork_set:
continue

# Found matching entry
result = entry.get("result", {})

# Resolve exception
exception: TransactionExceptionInstanceOrList | None = None
expect_exc = entry.get("expect_exception")
if expect_exc:
for constraint_str, exc_value in expect_exc.items():
exc_fork_set = ForkSet.model_validate(
constraint_str.split(",")
)
if fork in exc_fork_set:
exception = exc_value
break

return result, exception

raise ValueError(
f"No matching expect entry for d={d}, g={g}, v={v}, fork={fork}"
)


def resolve_expect_post_fork(
expect_entries: list[dict],
fork: Fork,
) -> tuple[dict, TransactionExceptionInstanceOrList | None]:
"""
Resolve expected post-state for a given fork only (no d/g/v matching).

Used by single-case generated Python tests that have fork-dependent
post-state (multiple expect sections with different networks but only
one (d, g, v) combo).
"""
for entry in expect_entries:
# Match fork against network constraints
network = entry["network"]
fork_set = ForkSet.model_validate(network)
if fork not in fork_set:
continue

# Found matching entry
result = entry.get("result", {})

# Resolve exception
exception: TransactionExceptionInstanceOrList | None = None
expect_exc = entry.get("expect_exception")
if expect_exc:
for constraint_str, exc_value in expect_exc.items():
exc_fork_set = ForkSet.model_validate(
constraint_str.split(",")
)
if fork in exc_fork_set:
exception = exc_value
break

return result, exception

raise ValueError(f"No matching expect entry for fork={fork}")
Loading
Loading