Skip to content
2 changes: 2 additions & 0 deletions codeflash/languages/function_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1223,6 +1223,7 @@ def process_single_candidate(
eval_ctx.record_successful_candidate(candidate.optimization_id, candidate_result.best_test_runtime, perf_gain)

# Check if this is a successful optimization
baseline_cv = original_code_baseline.benchmarking_test_results.timing_coefficient_of_variation()
is_successful_opt = speedup_critic(
candidate_result,
original_code_baseline.runtime,
Expand All @@ -1231,6 +1232,7 @@ def process_single_candidate(
best_throughput_until_now=None,
original_concurrency_metrics=original_code_baseline.concurrency_metrics,
best_concurrency_ratio_until_now=None,
baseline_timing_cv=baseline_cv,
) and quantity_of_tests_critic(candidate_result)

tree = self.build_runtime_info_tree(
Expand Down
65 changes: 62 additions & 3 deletions codeflash/models/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import math
from collections import Counter, defaultdict
from functools import lru_cache
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -962,17 +963,75 @@ def usable_runtime_data_by_test_case(self) -> dict[InvocationId, list[int]]:
return by_id

def total_passed_runtime(self) -> int:
"""Calculate the sum of runtimes of all test cases that passed.
"""Calculate the sum of median runtimes of all test cases that passed.

A testcase runtime is the minimum value of all looped execution runtimes.
A testcase runtime is the median value of all looped execution runtimes.
Using median instead of min reduces sensitivity to outlier-fast iterations
(lucky GC pauses, perfect cache alignment) that produce spurious speedups.

:return: The runtime in nanoseconds.
"""
import statistics

# TODO this doesn't look at the intersection of tests of baseline and original
return sum(
[min(usable_runtime_data) for _, usable_runtime_data in self.usable_runtime_data_by_test_case().items()]
statistics.median_low(usable_runtime_data)
for _, usable_runtime_data in self.usable_runtime_data_by_test_case().items()
)

def timing_coefficient_of_variation(self) -> float:
"""Calculate the median per-test-case coefficient of variation (CV) of loop iterations.

For each test case, computes CV = stdev / mean of its loop iteration runtimes.
Returns the median CV across test cases. This measures actual measurement noise
(how much repeated runs of the same test vary) rather than inter-test-case
differences (which reflect different inputs, not noise).

Returns 0.0 if no test case has enough loop iterations to compute CV.
"""
import statistics

runtime_data = self.usable_runtime_data_by_test_case()
if not runtime_data:
return 0.0

per_test_cvs: list[float] = []

# Use a single-pass Welford algorithm per list to compute sample standard deviation
# and mean in one traversal to avoid the double-pass of statistics.mean + statistics.stdev.
def compute_sample_cv(values: list[int]) -> float | None:
n = 0
mean = 0.0
m2 = 0.0
for x in values:
n += 1
x_f = float(x)
delta = x_f - mean
mean += delta / n
delta2 = x_f - mean
m2 += delta * delta2
if n < 2:
return None
# sample variance = m2 / (n - 1)
if mean == 0.0:
return None
sample_variance = m2 / (n - 1)
# Guard against tiny negative rounding artefacts
if sample_variance <= 0.0:
stdev = 0.0
else:
stdev = math.sqrt(sample_variance)
return stdev / mean

for runtimes in runtime_data.values():
cv = compute_sample_cv(runtimes)
if cv is not None:
per_test_cvs.append(cv)

if not per_test_cvs:
return 0.0
return statistics.median(per_test_cvs)

def effective_loop_count(self) -> int:
"""Calculate the effective number of complete loops.

Expand Down
57 changes: 37 additions & 20 deletions codeflash/result/critic.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def speedup_critic(
best_throughput_until_now: int | None = None,
original_concurrency_metrics: ConcurrencyMetrics | None = None,
best_concurrency_ratio_until_now: float | None = None,
baseline_timing_cv: float | None = None,
) -> bool:
"""Take in a correct optimized Test Result and decide if the optimization should actually be surfaced to the user.

Expand All @@ -81,6 +82,8 @@ def speedup_critic(
- The noise floor is a function of the original code runtime. Currently, the noise floor is 2xMIN_IMPROVEMENT_THRESHOLD
when the original runtime is less than 10 microseconds, and becomes MIN_IMPROVEMENT_THRESHOLD for any higher runtime.
- The noise floor is doubled when benchmarking on a (noisy) GitHub Action virtual instance.
- If baseline timing CV (coefficient of variation) is available, the noise floor is raised
to at least the observed variance to avoid accepting benchmark noise as real speedup.

For async throughput (when available):
- Evaluates throughput improvements using MIN_THROUGHPUT_IMPROVEMENT_THRESHOLD
Expand All @@ -95,46 +98,60 @@ def speedup_critic(
if not disable_gh_action_noise and env_utils.is_ci():
noise_floor = noise_floor * 2 # Increase the noise floor in GitHub Actions mode

perf_gain = performance_gain(
original_runtime_ns=original_code_runtime, optimized_runtime_ns=candidate_result.best_test_runtime
)
# Adapt noise floor to observed baseline variance: if the baseline benchmark has
# high variance, require proportionally larger speedups to avoid accepting noise.
# Cap at 30% to avoid rejecting genuine large speedups on noisy CI runners.
if baseline_timing_cv is not None and baseline_timing_cv > noise_floor:
noise_floor = min(baseline_timing_cv, 0.30)

# Cache frequently used attributes locally to avoid repeated attribute access
optimized_runtime = candidate_result.best_test_runtime

perf_gain = performance_gain(original_runtime_ns=original_code_runtime, optimized_runtime_ns=optimized_runtime)
runtime_improved = perf_gain > noise_floor

# Check runtime comparison with best so far
runtime_is_best = best_runtime_until_now is None or candidate_result.best_test_runtime < best_runtime_until_now
runtime_is_best = best_runtime_until_now is None or optimized_runtime < best_runtime_until_now

# Fast-path: if there is no async throughput data, runtime is the sole acceptance criteria.
if original_async_throughput is None or candidate_result.async_throughput is None:
return runtime_improved and runtime_is_best

# At this point async throughput data is available; acceptance is OR of runtime/throughput/concurrency
# Early return if runtime already qualifies to avoid extra work
if runtime_improved and runtime_is_best:
return True

# Throughput evaluation

throughput_improved = True # Default to True if no throughput data
throughput_is_best = True # Default to True if no throughput data

if original_async_throughput is not None and candidate_result.async_throughput is not None:
if original_async_throughput > 0:
original_thr = original_async_throughput
optimized_thr = candidate_result.async_throughput
if original_thr is not None and optimized_thr is not None:
if original_thr > 0:
throughput_gain_value = throughput_gain(
original_throughput=original_async_throughput, optimized_throughput=candidate_result.async_throughput
original_throughput=original_thr, optimized_throughput=optimized_thr
)
throughput_improved = throughput_gain_value > MIN_THROUGHPUT_IMPROVEMENT_THRESHOLD

throughput_is_best = (
best_throughput_until_now is None or candidate_result.async_throughput > best_throughput_until_now
)
throughput_is_best = best_throughput_until_now is None or optimized_thr > best_throughput_until_now

if throughput_improved and throughput_is_best:
return True

# Concurrency evaluation
concurrency_improved = False
concurrency_is_best = True
if original_concurrency_metrics is not None and candidate_result.concurrency_metrics is not None:
conc_gain = concurrency_gain(original_concurrency_metrics, candidate_result.concurrency_metrics)
concurrency_improved = conc_gain > MIN_CONCURRENCY_IMPROVEMENT_THRESHOLD
concurrency_is_best = (
best_concurrency_ratio_until_now is None
or candidate_result.concurrency_metrics.concurrency_ratio > best_concurrency_ratio_until_now
)
if concurrency_improved and concurrency_is_best:
return True

# Accept if ANY of: runtime, throughput, or concurrency improves significantly
if original_async_throughput is not None and candidate_result.async_throughput is not None:
throughput_acceptance = throughput_improved and throughput_is_best
runtime_acceptance = runtime_improved and runtime_is_best
concurrency_acceptance = concurrency_improved and concurrency_is_best
return throughput_acceptance or runtime_acceptance or concurrency_acceptance
return runtime_improved and runtime_is_best
return False


def get_acceptance_reason(
Expand Down
182 changes: 182 additions & 0 deletions tests/test_critic.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,3 +799,185 @@ def test_parse_concurrency_metrics() -> None:
metrics_no_class = parse_concurrency_metrics(test_results_no_class, "my_function")
assert metrics_no_class is not None
assert metrics_no_class.concurrency_ratio == 2.0 # 5000000 / 2500000


def test_speedup_critic_with_baseline_cv() -> None:
"""Test that high baseline variance raises the noise floor to reject noisy speedups."""
original_code_runtime = 100000 # 100µs — base noise floor is 5%
best_runtime_until_now = 100000

# 8% claimed speedup — would pass the default 5% noise floor
candidate_result = OptimizedCandidateResult(
max_loop_count=5,
best_test_runtime=92500, # ~8% faster
behavior_test_results=TestResults(),
benchmarking_test_results=TestResults(),
optimization_candidate_index=0,
total_candidate_timing=12,
)

# Without CV: 8% > 5% noise floor → accepted
assert speedup_critic(
candidate_result, original_code_runtime, best_runtime_until_now, disable_gh_action_noise=True
)

# With 15% CV: noise floor raised to 15% → 8% improvement rejected
assert not speedup_critic(
candidate_result,
original_code_runtime,
best_runtime_until_now,
disable_gh_action_noise=True,
baseline_timing_cv=0.15,
)

# With 3% CV (lower than default noise floor): noise floor stays at 5% → still accepted
assert speedup_critic(
candidate_result,
original_code_runtime,
best_runtime_until_now,
disable_gh_action_noise=True,
baseline_timing_cv=0.03,
)

# 25% speedup passes even with 15% CV
fast_candidate = OptimizedCandidateResult(
max_loop_count=5,
best_test_runtime=80000, # 25% faster
behavior_test_results=TestResults(),
benchmarking_test_results=TestResults(),
optimization_candidate_index=0,
total_candidate_timing=12,
)
assert speedup_critic(
fast_candidate,
original_code_runtime,
best_runtime_until_now,
disable_gh_action_noise=True,
baseline_timing_cv=0.15,
)

# CV cap: even with 50% CV, noise floor is capped at 30% — a 40% speedup should pass
big_speedup_candidate = OptimizedCandidateResult(
max_loop_count=5,
best_test_runtime=60000, # 40% faster
behavior_test_results=TestResults(),
benchmarking_test_results=TestResults(),
optimization_candidate_index=0,
total_candidate_timing=12,
)
assert speedup_critic(
big_speedup_candidate,
original_code_runtime,
best_runtime_until_now,
disable_gh_action_noise=True,
baseline_timing_cv=0.50,
)


def _make_test_invocation(runtime: int, loop_index: int = 0, iteration_id: str = "iter1") -> FunctionTestInvocation:
"""Helper to create a FunctionTestInvocation with minimal required fields."""
return FunctionTestInvocation(
id=InvocationId(
test_module_path="test_mod",
test_class_name="TestClass",
test_function_name="test_func",
function_getting_tested="target",
iteration_id=iteration_id,
),
file_name=Path("test.py"),
did_pass=True,
runtime=runtime,
test_framework="pytest",
loop_index=loop_index,
test_type=TestType.GENERATED_REGRESSION,
return_value=None,
timed_out=False,
)


def test_total_passed_runtime_uses_median() -> None:
"""Verify total_passed_runtime uses median (not min) for timing aggregation."""
test_results = TestResults(
test_results=[
_make_test_invocation(100, loop_index=0),
_make_test_invocation(200, loop_index=1),
_make_test_invocation(300, loop_index=2),
]
)

# With min: would be 100. With median_low (3 values): should be 200.
total = test_results.total_passed_runtime()
assert total == 200, f"Expected median_low(100, 200, 300) = 200, got {total}"


def test_total_passed_runtime_median_even_count() -> None:
"""Verify median_low picks the lower middle value for even-count data."""
test_results = TestResults(
test_results=[
_make_test_invocation(100, loop_index=0),
_make_test_invocation(200, loop_index=1),
_make_test_invocation(300, loop_index=2),
_make_test_invocation(400, loop_index=3),
]
)

# median_low of [100, 200, 300, 400] = 200 (lower of two middle values)
total = test_results.total_passed_runtime()
assert total == 200, f"Expected median_low(100, 200, 300, 400) = 200, got {total}"


def test_timing_coefficient_of_variation() -> None:
"""Verify CV is computed per-test-case, not across all raw iterations."""
import statistics

# Zero variance within a single test case
test_results = TestResults(
test_results=[_make_test_invocation(1000, loop_index=i) for i in range(4)]
)
assert test_results.timing_coefficient_of_variation() == 0.0

# Single test case with variance — CV is computed from its loop iterations
runtimes_var = [100, 200, 100, 200]
test_results_var = TestResults(
test_results=[_make_test_invocation(rt, loop_index=i) for i, rt in enumerate(runtimes_var)]
)
cv = test_results_var.timing_coefficient_of_variation()
expected_cv = statistics.stdev(runtimes_var) / statistics.mean(runtimes_var)
assert abs(cv - expected_cv) < 0.001, f"Expected CV {expected_cv:.4f}, got {cv:.4f}"

# Empty results
assert TestResults().timing_coefficient_of_variation() == 0.0


def test_timing_cv_multi_test_case() -> None:
"""CV should reflect per-test-case noise, not inter-test-case runtime differences."""
import statistics

# Two test cases with very different runtimes but low internal variance.
# Old (broken) approach: flatten [100, 100, 10000, 10000] → CV ≈ 1.30 (130%)
# New approach: test_a CV=0.0, test_b CV=0.0 → median=0.0
test_results = TestResults(
test_results=[
_make_test_invocation(100, loop_index=0, iteration_id="test_a"),
_make_test_invocation(100, loop_index=1, iteration_id="test_a"),
_make_test_invocation(10000, loop_index=0, iteration_id="test_b"),
_make_test_invocation(10000, loop_index=1, iteration_id="test_b"),
]
)
# Both test cases have zero internal variance — the 100x difference in runtimes is not noise
assert test_results.timing_coefficient_of_variation() == 0.0

# Two test cases, one noisy and one stable — median picks the middle ground
test_results_mixed = TestResults(
test_results=[
_make_test_invocation(1000, loop_index=0, iteration_id="stable"),
_make_test_invocation(1000, loop_index=1, iteration_id="stable"),
_make_test_invocation(1000, loop_index=0, iteration_id="noisy"),
_make_test_invocation(2000, loop_index=1, iteration_id="noisy"),
]
)
cv = test_results_mixed.timing_coefficient_of_variation()
# stable: CV=0.0, noisy: CV=stdev([1000,2000])/mean([1000,2000])
noisy_cv = statistics.stdev([1000, 2000]) / statistics.mean([1000, 2000])
expected_median = statistics.median([0.0, noisy_cv])
assert abs(cv - expected_median) < 0.001
Loading
Loading