From 8d6e61f8e2f05bef7277ee09a8847087d3a1aefc Mon Sep 17 00:00:00 2001 From: bartzbeielstein <32470350+bartzbeielstein@users.noreply.github.com> Date: Wed, 1 Apr 2026 17:28:39 +0200 Subject: [PATCH 1/2] docs: fit --- src/spotoptim/SpotOptim.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/spotoptim/SpotOptim.py b/src/spotoptim/SpotOptim.py index 7eb12e6..c5313a2 100644 --- a/src/spotoptim/SpotOptim.py +++ b/src/spotoptim/SpotOptim.py @@ -2716,12 +2716,12 @@ def remove_nan( return X[finite_mask], y[finite_mask] # ==================== - # Surrogate & Acquisition + # Surrogate Fitting # ==================== def _fit_surrogate(self, X: np.ndarray, y: np.ndarray) -> None: """Fit surrogate model to data. - Used in optimize() to fit the surrogate model. + Used by _fit_scheduler() to fit the surrogate model. If the number of points exceeds `self.max_surrogate_points`, a subset of points is selected using the selection dispatcher. @@ -2832,6 +2832,10 @@ def _fit_scheduler(self) -> None: X_for_surrogate = self.transform_X(self.X_) self._fit_surrogate(X_for_surrogate, self.y_) + # ==================== + # Prediction + # ==================== + def _predict_with_uncertainty(self, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """Predict with uncertainty estimates, handling surrogates without return_std. Used in the _acquisition_function() method and in the plot_surrogate() method. From b8f89acd0c1bdba4fd734f045f17e035b4bb70fa Mon Sep 17 00:00:00 2001 From: bartzbeielstein <32470350+bartzbeielstein@users.noreply.github.com> Date: Wed, 1 Apr 2026 19:14:45 +0200 Subject: [PATCH 2/2] fix: global structure --- .github/workflows/release-preflight.yml | 16 +- src/spotoptim/SpotOptim.py | 3404 +++++++++++---------- tests/test_optimize_refactored_methods.py | 6 +- tests/test_run_sequential_loop_example.py | 2 +- 4 files changed, 1773 insertions(+), 1655 deletions(-) diff --git a/.github/workflows/release-preflight.yml b/.github/workflows/release-preflight.yml index 7e905e4..6d11cea 100644 --- a/.github/workflows/release-preflight.yml +++ b/.github/workflows/release-preflight.yml @@ -49,6 +49,7 @@ jobs: - name: Preflight token and push checks env: RELEASE_TOKEN: ${{ secrets.SEMANTIC_RELEASE_TOKEN || github.token }} + GH_TOKEN: ${{ secrets.SEMANTIC_RELEASE_TOKEN || github.token }} run: | set -euo pipefail @@ -60,14 +61,13 @@ jobs: echo "Checking authenticated read access to ${GITHUB_REPOSITORY}..." git ls-remote --exit-code origin HEAD >/dev/null - echo "Checking dry-run push permission to main branch..." - git push --dry-run origin HEAD:refs/heads/main >/dev/null - - temp_tag="preflight-sr-${GITHUB_RUN_ID}-${GITHUB_RUN_ATTEMPT}" - git tag "${temp_tag}" HEAD - trap 'git tag -d "${temp_tag}" >/dev/null 2>&1 || true' EXIT + echo "Checking push permission via GitHub API..." + push_perm=$(gh api "repos/${GITHUB_REPOSITORY}" --jq '.permissions.push' 2>/dev/null || echo "false") + if [ "${push_perm}" != "true" ]; then + echo "::error::Token does not have push permission to ${GITHUB_REPOSITORY}. Set SEMANTIC_RELEASE_TOKEN to a PAT with repo write access." + exit 1 + fi - echo "Checking dry-run push permission for tags..." - git push --dry-run origin "refs/tags/${temp_tag}" >/dev/null + echo "Checking tag push permission (same as push)... already verified above." echo "Preflight passed: token and push permissions are sufficient for semantic-release." diff --git a/src/spotoptim/SpotOptim.py b/src/spotoptim/SpotOptim.py index c5313a2..48d9e7f 100644 --- a/src/spotoptim/SpotOptim.py +++ b/src/spotoptim/SpotOptim.py @@ -1006,10 +1006,6 @@ def __dir__(self): return list(d) - # ==================== - # Configuration & Helpers - # ==================== - def set_seed(self) -> None: """Set global random seeds for reproducibility. Sets seeds for: @@ -1039,7 +1035,12 @@ def set_seed(self) -> None: torch.cuda.manual_seed_all(self.seed) # ==================== - # var_type and bounds related methods + # TASK_VARS: + # detect_var_type + # modify_bounds_based_on_var_type + # repair_non_numeric + # handle_default_var_trans + # process_factor_bounds # ==================== def detect_var_type(self) -> list: @@ -1265,80 +1266,9 @@ def process_factor_bounds(self) -> None: self.bounds = processed_bounds # ==================== - # Statistics and Results: best hyperparameters - # ==================== - - def get_best_hyperparameters( - self, as_dict: bool = True - ) -> Union[Dict[str, Any], np.ndarray, None]: - """ - Get the best hyperparameter configuration found during optimization. - If noise handling is active (repeats_initial > 1 or OCBA), this returns the parameter - configuration associated with the best *mean* objective value. Otherwise, it returns - the configuration associated with the absolute best observed value. - - Args: - as_dict (bool, optional): If True, returns a dictionary mapping parameter names - to their values. If False, returns the raw numpy array. Defaults to True. - - Returns: - Union[Dict[str, Any], np.ndarray, None]: The best hyperparameter configuration. - Returns None if optimization hasn't started (no data). - - Examples: - ```{python} - from spotoptim import SpotOptim - from spotoptim.function import sphere - - opt = SpotOptim(fun=sphere, - bounds=[(-5, 5), (0, 10)], - n_initial=5, - var_name=["x", "y"], - verbose=True) - opt.optimize() - best_params = opt.get_best_hyperparameters() - print(best_params['x']) # Should be close to 0 - ``` - """ - if self.X_ is None or len(self.X_) == 0: - return None - - # Determine which "best" to use - if (self.repeats_initial > 1 or self.repeats_surrogate > 1) and hasattr( - self, "min_mean_X" - ): - best_x = self.min_mean_X - else: - best_x = self.best_x_ - - if not as_dict: - return best_x - - # Map factors using existing method (handles 2D, returns 2D) - # We pass best_x as (1, D) and get (1, D) back - mapped_x = self.map_to_factor_values(best_x.reshape(1, -1))[0] - - # Convert to dictionary with types - params = {} - names = ( - self.var_name if self.var_name else [f"p{i}" for i in range(len(best_x))] - ) - - for i, name in enumerate(names): - val = mapped_x[i] - - # Handle types if available (specifically int, as factors are already mapped) - if self.var_type: - v_type = self.var_type[i] - if v_type == "int": - val = int(round(val)) - - params[name] = val - - return params - - # ==================== - # Save, Load, and Reinitialization of Components (Pickling) + # TASK_SAVE_LOAD: + # get_pickle_safe_optimizer + # reinitialize_components # ==================== def get_pickle_safe_optimizer( @@ -1446,153 +1376,12 @@ def reinitialize_components(self) -> None: if not hasattr(self, "surrogate") or self.surrogate is None: self.init_surrogate() - # ==================== - # Surrogate - # ==================== - - def init_surrogate(self) -> None: - """Initialize or configure the surrogate model for optimization. Handles three surrogate configurations: - * List of surrogates: sets up multi-surrogate selection with probability weights and per-surrogate `max_surrogate_points`. - * None (default): creates a `GaussianProcessRegressor` with a - `ConstantKernel * Matern(nu=2.5)` kernel, 100 optimizer restarts, - and `normalize_y=True`. - * User-provided surrogate: accepted as-is; internal bookkeeping - attributes (`_max_surrogate_points_list`, - `_active_max_surrogate_points`) are still initialised. - After this method returns the following attributes are set: - * `self.surrogate` — the active surrogate model. - * `self._surrogates_list` — `list | None`. - * `self._prob_surrogate` — normalised selection probabilities or `None`. - * `self._max_surrogate_points_list` — per-surrogate point caps or `None`. - * `self._active_max_surrogate_points` — active cap. - - Raises: - ValueError: If the surrogate list is empty. - ValueError: If 'prob_surrogate' length does not match the surrogate list length. - ValueError: If 'max_surrogate_points' list length does not match the surrogate list length. - - Returns: - None - - Examples: - ```{python} - import numpy as np - from spotoptim import SpotOptim - # Default surrogate (GaussianProcessRegressor) - opt = SpotOptim( - fun=lambda X: np.sum(X**2, axis=1), - bounds=[(-5, 5), (-5, 5)], - n_initial=5, - ) - print(type(opt.surrogate).__name__) - ``` - - ```{python} - import numpy as np - from spotoptim import SpotOptim - from sklearn.ensemble import RandomForestRegressor - # User-provided surrogate - rf = RandomForestRegressor(n_estimators=50, random_state=42) - opt = SpotOptim( - fun=lambda X: np.sum(X**2, axis=1), - bounds=[(-5, 5), (-5, 5)], - n_initial=5, - surrogate=rf, - ) - print(type(opt.surrogate).__name__) - ``` - - ```{python} - import numpy as np - from spotoptim import SpotOptim - from sklearn.ensemble import RandomForestRegressor - from sklearn.gaussian_process import GaussianProcessRegressor - # List of surrogates with selection probabilities - surrogates = [GaussianProcessRegressor(), RandomForestRegressor()] - opt = SpotOptim( - fun=lambda X: np.sum(X**2, axis=1), - bounds=[(-5, 5), (-5, 5)], - n_initial=5, - surrogate=surrogates, - prob_surrogate=[0.7, 0.3], - ) - print(opt._prob_surrogate) - print([type(s).__name__ for s in opt._surrogates_list]) - ``` - """ - self._surrogates_list = None - self._prob_surrogate = None - - if isinstance(self.surrogate, list): - self._surrogates_list = self.surrogate - if not self._surrogates_list: - raise ValueError("Surrogate list cannot be empty.") - - # Handle probabilities - if self.config.prob_surrogate is None: - # Uniform probability - n = len(self._surrogates_list) - self._prob_surrogate = [1.0 / n] * n - else: - probs = self.config.prob_surrogate - if len(probs) != len(self._surrogates_list): - raise ValueError( - f"Length of prob_surrogate ({len(probs)}) must match " - f"number of surrogates ({len(self._surrogates_list)})." - ) - # Normalize probabilities - total = sum(probs) - if not np.isclose(total, 1.0) and total > 0: - self._prob_surrogate = [p / total for p in probs] - else: - self._prob_surrogate = probs - - # Handle max_surrogate_points list - self._max_surrogate_points_list = None - if isinstance(self.config.max_surrogate_points, list): - if len(self.config.max_surrogate_points) != len(self._surrogates_list): - raise ValueError( - f"Length of max_surrogate_points ({len(self.config.max_surrogate_points)}) " - f"must match number of surrogates ({len(self._surrogates_list)})." - ) - self._max_surrogate_points_list = self.config.max_surrogate_points - else: - # If int or None, broadcast to list for easier indexing - self._max_surrogate_points_list = [ - self.config.max_surrogate_points - ] * len(self._surrogates_list) - - # Set initial surrogate and max points - self.surrogate = self._surrogates_list[0] - self._active_max_surrogate_points = self._max_surrogate_points_list[0] - - elif self.surrogate is None: - # Default single surrogate case - self._max_surrogate_points_list = None - self._active_max_surrogate_points = self.config.max_surrogate_points - - kernel = ConstantKernel(1.0, (1e-2, 1e12)) * Matern( - length_scale=1.0, length_scale_bounds=(1e-4, 1e2), nu=2.5 - ) - - # Determine optimizer for GPR - optimizer = "fmin_l_bfgs_b" # Default used by sklearn - if self.config.acquisition_optimizer_kwargs is not None: - optimizer = partial( - gpr_minimize_wrapper, **self.config.acquisition_optimizer_kwargs - ) - - self.surrogate = GaussianProcessRegressor( - kernel=kernel, - n_restarts_optimizer=100, - normalize_y=True, - random_state=self.seed, - optimizer=optimizer, - ) - # ==================== - # Dimension Reduction + # TASK_DIM: + # setup_dimension_reduction() + # to_red_dim() + # to_all_dim() # ==================== def setup_dimension_reduction(self) -> None: @@ -1780,7 +1569,13 @@ def sphere(X): return X_full # ==================== - # Variable Transformation and Mapping + # TASK_TRANSFORM: + # transform_value() + # inverse_transform_value() + # transform_X() + # inverse_transform_X() + # transform_bounds() + # map_to_factor_values() # ==================== def transform_value(self, x: float, trans: Optional[str]) -> float: @@ -2113,7 +1908,14 @@ def map_to_factor_values(self, X: np.ndarray) -> np.ndarray: return X_mapped # ==================== - # Initial Design + # TASK_INIT_DESIGN: + # get_initial_design() + # generate_initial_design() + # curate_initial_design() + # rm_initial_design_NA_values() + # validate_x0() + # check_size_initial_design() + # get_best_xy_initial_design() # ==================== def get_initial_design(self, X0: Optional[np.ndarray] = None) -> np.ndarray: @@ -2331,7 +2133,7 @@ def curate_initial_design(self, X0: np.ndarray) -> np.ndarray: return X0 - def rm_NA_values( + def rm_initial_design_NA_values( self, X0: np.ndarray, y0: np.ndarray ) -> Tuple[np.ndarray, np.ndarray, int]: """Remove NaN/inf values from initial design evaluations. @@ -2362,14 +2164,14 @@ def rm_NA_values( ) X0 = np.array([[1, 2], [3, 4], [5, 6]]) y0 = np.array([5.0, np.nan, np.inf]) - X0_clean, y0_clean, n_eval = opt.rm_NA_values(X0, y0) + X0_clean, y0_clean, n_eval = opt.rm_initial_design_NA_values(X0, y0) print(X0_clean.shape) # (1, 2) print(y0_clean) # array([5.]) print(n_eval) # 3 # All valid values - no filtering X0 = np.array([[1, 2], [3, 4]]) y0 = np.array([5.0, 25.0]) - X0_clean, y0_clean, n_eval = opt.rm_NA_values(X0, y0) + X0_clean, y0_clean, n_eval = opt.rm_initial_design_NA_values(X0, y0) print(X0_clean.shape) # (2, 2) print(n_eval) # 2 ``` @@ -2666,58 +2468,152 @@ def get_best_xy_initial_design(self) -> None: print(f"Initial best: f(x) = {self.best_y_:.6f}") - def remove_nan( - self, X: np.ndarray, y: np.ndarray, stop_on_zero_return: bool = True - ) -> tuple: - """Remove rows where y contains NaN or inf values. - Used in the optimize() method after function evaluations. - - Args: - X (ndarray): Design matrix, shape (n_samples, n_features). - y (ndarray): Objective values, shape (n_samples,). - stop_on_zero_return (bool): If True, raise error when all values are removed. + # ==================== + # TASK_Surrogate: + # init_surrogate() + # _fit_surrogate() + # _fit_scheduler() + # ==================== - Returns: - tuple: (X_clean, y_clean) with NaN/inf rows removed. + def init_surrogate(self) -> None: + """Initialize or configure the surrogate model for optimization. Handles three surrogate configurations: + * List of surrogates: sets up multi-surrogate selection with probability weights and per-surrogate `max_surrogate_points`. + * None (default): creates a `GaussianProcessRegressor` with a + `ConstantKernel * Matern(nu=2.5)` kernel, 100 optimizer restarts, + and `normalize_y=True`. + * User-provided surrogate: accepted as-is; internal bookkeeping + attributes (`_max_surrogate_points_list`, + `_active_max_surrogate_points`) are still initialised. + After this method returns the following attributes are set: + * `self.surrogate` — the active surrogate model. + * `self._surrogates_list` — `list | None`. + * `self._prob_surrogate` — normalised selection probabilities or `None`. + * `self._max_surrogate_points_list` — per-surrogate point caps or `None`. + * `self._active_max_surrogate_points` — active cap. Raises: - ValueError: If all values are NaN/inf and stop_on_zero_return is True. + ValueError: If the surrogate list is empty. + ValueError: If 'prob_surrogate' length does not match the surrogate list length. + ValueError: If 'max_surrogate_points' list length does not match the surrogate list length. + + Returns: + None Examples: ```{python} import numpy as np from spotoptim import SpotOptim - from spotoptim.function import sphere - opt = SpotOptim(fun=sphere, bounds=[(-5, 5)]) - X = np.array([[1, 2], [3, 4], [5, 6]]) - y = np.array([1.0, np.nan, np.inf]) - X_clean, y_clean = opt.remove_nan(X, y, stop_on_zero_return=False) - print("Clean X:", X_clean) - print("Clean y:", y_clean) + # Default surrogate (GaussianProcessRegressor) + opt = SpotOptim( + fun=lambda X: np.sum(X**2, axis=1), + bounds=[(-5, 5), (-5, 5)], + n_initial=5, + ) + print(type(opt.surrogate).__name__) + ``` + + ```{python} + import numpy as np + from spotoptim import SpotOptim + from sklearn.ensemble import RandomForestRegressor + # User-provided surrogate + rf = RandomForestRegressor(n_estimators=50, random_state=42) + opt = SpotOptim( + fun=lambda X: np.sum(X**2, axis=1), + bounds=[(-5, 5), (-5, 5)], + n_initial=5, + surrogate=rf, + ) + print(type(opt.surrogate).__name__) + ``` + + ```{python} + import numpy as np + from spotoptim import SpotOptim + from sklearn.ensemble import RandomForestRegressor + from sklearn.gaussian_process import GaussianProcessRegressor + # List of surrogates with selection probabilities + surrogates = [GaussianProcessRegressor(), RandomForestRegressor()] + opt = SpotOptim( + fun=lambda X: np.sum(X**2, axis=1), + bounds=[(-5, 5), (-5, 5)], + n_initial=5, + surrogate=surrogates, + prob_surrogate=[0.7, 0.3], + ) + print(opt._prob_surrogate) + print([type(s).__name__ for s in opt._surrogates_list]) ``` """ - # Find finite values - finite_mask = np.isfinite(y) + self._surrogates_list = None + self._prob_surrogate = None - if not np.any(finite_mask): - msg = "All objective function values are NaN or inf." - if stop_on_zero_return: - raise ValueError(msg) + if isinstance(self.surrogate, list): + self._surrogates_list = self.surrogate + if not self._surrogates_list: + raise ValueError("Surrogate list cannot be empty.") + + # Handle probabilities + if self.config.prob_surrogate is None: + # Uniform probability + n = len(self._surrogates_list) + self._prob_surrogate = [1.0 / n] * n else: - if self.verbose: - print(f"Warning: {msg} Returning empty arrays.") - return np.array([]).reshape(0, X.shape[1]), np.array([]) + probs = self.config.prob_surrogate + if len(probs) != len(self._surrogates_list): + raise ValueError( + f"Length of prob_surrogate ({len(probs)}) must match " + f"number of surrogates ({len(self._surrogates_list)})." + ) + # Normalize probabilities + total = sum(probs) + if not np.isclose(total, 1.0) and total > 0: + self._prob_surrogate = [p / total for p in probs] + else: + self._prob_surrogate = probs - # Filter out non-finite values - n_removed = np.sum(~finite_mask) - if n_removed > 0 and self.verbose: - print(f"Warning: Removed {n_removed} sample(s) with NaN/inf values") + # Handle max_surrogate_points list + self._max_surrogate_points_list = None + if isinstance(self.config.max_surrogate_points, list): + if len(self.config.max_surrogate_points) != len(self._surrogates_list): + raise ValueError( + f"Length of max_surrogate_points ({len(self.config.max_surrogate_points)}) " + f"must match number of surrogates ({len(self._surrogates_list)})." + ) + self._max_surrogate_points_list = self.config.max_surrogate_points + else: + # If int or None, broadcast to list for easier indexing + self._max_surrogate_points_list = [ + self.config.max_surrogate_points + ] * len(self._surrogates_list) - return X[finite_mask], y[finite_mask] + # Set initial surrogate and max points + self.surrogate = self._surrogates_list[0] + self._active_max_surrogate_points = self._max_surrogate_points_list[0] - # ==================== - # Surrogate Fitting - # ==================== + elif self.surrogate is None: + # Default single surrogate case + self._max_surrogate_points_list = None + self._active_max_surrogate_points = self.config.max_surrogate_points + + kernel = ConstantKernel(1.0, (1e-2, 1e12)) * Matern( + length_scale=1.0, length_scale_bounds=(1e-4, 1e2), nu=2.5 + ) + + # Determine optimizer for GPR + optimizer = "fmin_l_bfgs_b" # Default used by sklearn + if self.config.acquisition_optimizer_kwargs is not None: + optimizer = partial( + gpr_minimize_wrapper, **self.config.acquisition_optimizer_kwargs + ) + + self.surrogate = GaussianProcessRegressor( + kernel=kernel, + n_restarts_optimizer=100, + normalize_y=True, + random_state=self.seed, + optimizer=optimizer, + ) def _fit_surrogate(self, X: np.ndarray, y: np.ndarray) -> None: """Fit surrogate model to data. @@ -2833,7 +2729,9 @@ def _fit_scheduler(self) -> None: self._fit_surrogate(X_for_surrogate, self.y_) # ==================== - # Prediction + # TASK_PREDICT: + # _predict_with_uncertainty() + # _acquisition_function() # ==================== def _predict_with_uncertainty(self, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: @@ -2958,6 +2856,102 @@ def _acquisition_function(self, x: np.ndarray) -> np.ndarray: raise ValueError(f"Unknown acquisition function: {self.acquisition}") + # ==================== + # TASK_OPTIM: + # evaluate_function() + # _optimize_acquisition_tricands() + # _prepare_de_kwargs() + # _optimize_acquisition_de() + # _optimize_acquisition_scipy() + # _try_optimizer_candidates() + # remove_nan() + # _handle_acquisition_failure() + # _try_fallback_strategy() + # get_shape() + # ==================== + + def evaluate_function(self, X: np.ndarray) -> np.ndarray: + """Evaluate objective function at points X. + Used in the optimize() method to evaluate the objective function. + + Input Space: `X` is expected in Transformed and Mapped Space (Internal scale, Reduced dimensions). + Process as follows: + 1. Expands `X` to Transformed Space (Full dimensions) if dimension reduction is active. + 2. Inverse transforms `X` to Natural Space (Original scale). + 3. Evaluates the user function with points in Natural Space. + + If dimension reduction is active, expands `X` to full dimensions before evaluation. + Supports both single-objective and multi-objective functions. For multi-objective + functions, converts to single-objective using `mo2so` method. + + Args: + X (ndarray): Points to evaluate in Transformed and Mapped Space, shape (n_samples, n_reduced_features). + + Returns: + ndarray: Function values, shape (n_samples,). + + Examples: + ```{python} + import numpy as np + from spotoptim import SpotOptim + # Single-objective function + opt_so = SpotOptim( + fun=lambda X: np.sum(X**2, axis=1), + bounds=[(-5, 5), (-5, 5)], + max_iter=10, + n_initial=5 + ) + X = np.array([[1.0, 2.0], [3.0, 4.0]]) + y = opt_so.evaluate_function(X) + print(f"Single-objective output: {y}") + ``` + + ```{python} + import numpy as np + from spotoptim import SpotOptim + # Multi-objective function (default: use first objective) + opt_mo = SpotOptim( + fun=lambda X: np.column_stack([ + np.sum(X**2, axis=1), + np.sum((X-1)**2, axis=1) + ]), + bounds=[(-5, 5), (-5, 5)], + max_iter=10, + n_initial=5 + ) + y_mo = opt_mo.evaluate_function(X) + print(f"Multi-objective output (first obj): {y_mo}") + ``` + """ + # Ensure X is 2D + X = np.atleast_2d(X) + + # Expand to full dimensions if needed + if self.red_dim: + X = self.to_all_dim(X) + + # Apply inverse transformations to get original scale for function evaluation + X_original = self.inverse_transform_X(X) + + # Map factor variables to original string values + X_for_eval = self.map_to_factor_values(X_original) + + # Evaluate function + y_raw = self.fun(X_for_eval, *self.args, **self.kwargs) + + # Convert to numpy array if needed + if not isinstance(y_raw, np.ndarray): + y_raw = np.array([y_raw]) + + # Handle multi-objective case + y = self.mo2so(y_raw) + + # Ensure y is 1D + if y.ndim > 1: + y = y.ravel() + + return y + def _optimize_acquisition_tricands(self) -> np.ndarray: """Optimize using geometric infill strategy via triangulation candidates. @@ -3337,31 +3331,81 @@ def is_valid(p, reference_set): return valid_candidates - def _handle_acquisition_failure(self) -> np.ndarray: - """Handle acquisition failure by proposing new design points. - Used in the suggest_next_infill_point() method. - This method is called when no new design points can be suggested - by the surrogate model (e.g., when the proposed point is too close - to existing points). It proposes a random space-filling design as a fallback. + def remove_nan( + self, X: np.ndarray, y: np.ndarray, stop_on_zero_return: bool = True + ) -> tuple: + """Remove rows where y contains NaN or inf values. + Used in the optimize() method after function evaluations. + + Args: + X (ndarray): Design matrix, shape (n_samples, n_features). + y (ndarray): Objective values, shape (n_samples,). + stop_on_zero_return (bool): If True, raise error when all values are removed. Returns: - ndarray: New design point as a fallback, shape (n_features,). + tuple: (X_clean, y_clean) with NaN/inf rows removed. + + Raises: + ValueError: If all values are NaN/inf and stop_on_zero_return is True. Examples: - >>> import numpy as np - >>> from spotoptim import SpotOptim - >>> opt = SpotOptim( - ... fun=lambda X: np.sum(X**2, axis=1), - ... bounds=[(-5, 5), (-5, 5)], - ... acquisition_failure_strategy='random' - ... ) - >>> opt.X_ = np.array([[0, 0], [1, 1]]) - >>> opt.y_ = np.array([0, 2]) - >>> x_fallback = opt._handle_acquisition_failure() - >>> x_fallback.shape - (2,) - >>> print(x_fallback) - [some new point within bounds] + ```{python} + import numpy as np + from spotoptim import SpotOptim + from spotoptim.function import sphere + opt = SpotOptim(fun=sphere, bounds=[(-5, 5)]) + X = np.array([[1, 2], [3, 4], [5, 6]]) + y = np.array([1.0, np.nan, np.inf]) + X_clean, y_clean = opt.remove_nan(X, y, stop_on_zero_return=False) + print("Clean X:", X_clean) + print("Clean y:", y_clean) + ``` + """ + # Find finite values + finite_mask = np.isfinite(y) + + if not np.any(finite_mask): + msg = "All objective function values are NaN or inf." + if stop_on_zero_return: + raise ValueError(msg) + else: + if self.verbose: + print(f"Warning: {msg} Returning empty arrays.") + return np.array([]).reshape(0, X.shape[1]), np.array([]) + + # Filter out non-finite values + n_removed = np.sum(~finite_mask) + if n_removed > 0 and self.verbose: + print(f"Warning: Removed {n_removed} sample(s) with NaN/inf values") + + return X[finite_mask], y[finite_mask] + + + def _handle_acquisition_failure(self) -> np.ndarray: + """Handle acquisition failure by proposing new design points. + Used in the suggest_next_infill_point() method. + This method is called when no new design points can be suggested + by the surrogate model (e.g., when the proposed point is too close + to existing points). It proposes a random space-filling design as a fallback. + + Returns: + ndarray: New design point as a fallback, shape (n_features,). + + Examples: + >>> import numpy as np + >>> from spotoptim import SpotOptim + >>> opt = SpotOptim( + ... fun=lambda X: np.sum(X**2, axis=1), + ... bounds=[(-5, 5), (-5, 5)], + ... acquisition_failure_strategy='random' + ... ) + >>> opt.X_ = np.array([[0, 0], [1, 1]]) + >>> opt.y_ = np.array([0, 2]) + >>> x_fallback = opt._handle_acquisition_failure() + >>> x_fallback.shape + (2,) + >>> print(x_fallback) + [some new point within bounds] """ if self.acquisition_failure_strategy == "random": # Default: random space-filling design (Latin Hypercube Sampling) @@ -3465,6 +3509,14 @@ def get_shape(self, y: np.ndarray) -> Tuple[int, Optional[int]]: # For higher dimensions, flatten to 1D return y.size, None + + # ==================== + # TASK_MO: + # store_mo() + # mo2so() + # ==================== + + def store_mo(self, y_mo: np.ndarray) -> None: """Store multi-objective values in self.y_mo. If multi-objective values are present (ndim==2), they are stored in self.y_mo. @@ -3580,6 +3632,100 @@ def custom_mo2so(y_mo): return y0 + # ==================== + # TASK_OCBA: + # apply_ocba() + # get_ranks() + # get_ocba() + # get_ocba_X() + # ==================== + + + def apply_ocba(self) -> Optional[np.ndarray]: + """Apply Optimal Computing Budget Allocation for noisy functions. + Determines which existing design points should be re-evaluated based on + OCBA algorithm. This method computes optimal budget allocation to improve + the quality of the estimated best design. + + Returns: + Optional[ndarray]: + Array of design points to re-evaluate, shape (n_re_eval, n_features). + Returns None if OCBA conditions are not met or OCBA is disabled. + + Note: + OCBA is only applied when: + + * (self.repeats_initial > 1) or (self.repeats_surrogate > 1) + * self.ocba_delta > 0 + * All variances are > 0 + * At least 3 design points exist + + Examples: + ```{python} + import numpy as np + from spotoptim import SpotOptim + opt = SpotOptim( + fun=lambda X: np.sum(X**2, axis=1) + np.random.normal(0, 0.1, X.shape[0]), + bounds=[(-5, 5), (-5, 5)], + n_initial=5, + repeats_surrogate=2, + ocba_delta=5, + verbose=True + ) + # Simulate optimization state (normally done in optimize()) + opt.mean_X = np.array([[1, 2], [0, 0], [2, 1]]) + opt.mean_y = np.array([5.0, 0.1, 5.0]) + opt.var_y = np.array([0.1, 0.05, 0.15]) + X_ocba = opt.apply_ocba() + # OCBA: Adding 5 re-evaluation(s). + # The following should be true: + print(X_ocba.shape[0] == 5) + ``` + + OCBA skipped - insufficient points + ```{python} + import numpy as np + from spotoptim import SpotOptim + opt2 = SpotOptim( + fun=lambda X: np.sum(X**2, axis=1), + bounds=[(-5, 5), (-5, 5)], + repeats_surrogate=2, + ocba_delta=5, + verbose=True + ) + opt2.mean_X = np.array([[1, 2], [0, 0]]) + opt2.mean_y = np.array([5.0, 0.1]) + opt2.var_y = np.array([0.1, 0.05]) + X_ocba = opt2.apply_ocba() + # Warning: OCBA skipped (need >2 points with variance > 0) + print(X_ocba is None) + ``` + """ + # OCBA: Compute optimal budget allocation for noisy functions + # This determines which existing design points should be re-evaluated + X_ocba = None + if ( + self.repeats_initial > 1 or self.repeats_surrogate > 1 + ) and self.ocba_delta > 0: + # Check conditions for OCBA (need variance > 0 and at least 3 points) + if not np.all(self.var_y > 0) and (self.mean_X.shape[0] <= 2): + if self.verbose: + print("Warning: OCBA skipped (need >2 points with variance > 0)") + elif np.all(self.var_y > 0) and (self.mean_X.shape[0] > 2): + # Get OCBA allocation + X_ocba = self.get_ocba_X( + self.mean_X, + self.mean_y, + self.var_y, + self.ocba_delta, + verbose=self.verbose, + ) + if self.verbose and X_ocba is not None: + print(f" OCBA: Adding {X_ocba.shape[0]} re-evaluation(s)") + + return X_ocba + + def get_ranks(self, x: np.ndarray) -> np.ndarray: """Returns ranks of numbers within input array x. @@ -3725,87 +3871,14 @@ def get_ocba_X( else: return None - def evaluate_function(self, X: np.ndarray) -> np.ndarray: - """Evaluate objective function at points X. - Used in the optimize() method to evaluate the objective function. - - Input Space: `X` is expected in Transformed and Mapped Space (Internal scale, Reduced dimensions). - Process as follows: - 1. Expands `X` to Transformed Space (Full dimensions) if dimension reduction is active. - 2. Inverse transforms `X` to Natural Space (Original scale). - 3. Evaluates the user function with points in Natural Space. - - If dimension reduction is active, expands `X` to full dimensions before evaluation. - Supports both single-objective and multi-objective functions. For multi-objective - functions, converts to single-objective using `mo2so` method. - - Args: - X (ndarray): Points to evaluate in Transformed and Mapped Space, shape (n_samples, n_reduced_features). - - Returns: - ndarray: Function values, shape (n_samples,). - - Examples: - ```{python} - import numpy as np - from spotoptim import SpotOptim - # Single-objective function - opt_so = SpotOptim( - fun=lambda X: np.sum(X**2, axis=1), - bounds=[(-5, 5), (-5, 5)], - max_iter=10, - n_initial=5 - ) - X = np.array([[1.0, 2.0], [3.0, 4.0]]) - y = opt_so.evaluate_function(X) - print(f"Single-objective output: {y}") - ``` - - ```{python} - import numpy as np - from spotoptim import SpotOptim - # Multi-objective function (default: use first objective) - opt_mo = SpotOptim( - fun=lambda X: np.column_stack([ - np.sum(X**2, axis=1), - np.sum((X-1)**2, axis=1) - ]), - bounds=[(-5, 5), (-5, 5)], - max_iter=10, - n_initial=5 - ) - y_mo = opt_mo.evaluate_function(X) - print(f"Multi-objective output (first obj): {y_mo}") - ``` - """ - # Ensure X is 2D - X = np.atleast_2d(X) - - # Expand to full dimensions if needed - if self.red_dim: - X = self.to_all_dim(X) - - # Apply inverse transformations to get original scale for function evaluation - X_original = self.inverse_transform_X(X) - - # Map factor variables to original string values - X_for_eval = self.map_to_factor_values(X_original) - - # Evaluate function - y_raw = self.fun(X_for_eval, *self.args, **self.kwargs) - - # Convert to numpy array if needed - if not isinstance(y_raw, np.ndarray): - y_raw = np.array([y_raw]) - - # Handle multi-objective case - y = self.mo2so(y_raw) - - # Ensure y is 1D - if y.ndim > 1: - y = y.ravel() - - return y + # ==================== + # TASK_SELECT: + # select_distant_points() + # select_best_cluster() + # _selection_dispatcher() + # select_new() + # suggest_next_infill_point() + # ==================== def select_distant_points( self, X: np.ndarray, y: np.ndarray, k: int @@ -3997,14 +4070,24 @@ def sphere(X): ind = is_duplicate return A[~ind], ~ind - def optimize_acquisition_func(self) -> np.ndarray: - """Optimize the acquisition function to find the next point to evaluate. + + + def suggest_next_infill_point(self) -> np.ndarray: + """Suggest next point to evaluate (dispatcher). + Used in both sequential and parallel optimization loops. This method orchestrates the process of generating candidate points from the acquisition function optimizer, handling any failures in the acquisition process with a fallback strategy, and ensuring that the returned point(s) are valid and ready for evaluation. + The returned point is in the Transformed and Mapped Space (Internal Optimization Space). + This means: + 1. Transformations (e.g., log, sqrt) have been applied. + 2. Dimension reduction has been applied (fixed variables removed). + Process: + 1. Try candidates from acquisition function optimizer. + 2. Handle acquisition failure (fallback). + 3. Return last attempt if all fails. + Returns: - ndarray: The optimized point(s). - If acquisition_fun_return_size == 1, returns 1D array of shape (n_features,). - If acquisition_fun_return_size > 1, returns 2D array of shape (N, n_features), - where N is min(acquisition_fun_return_size, population_size). + ndarray: Next point(s) to evaluate in Transformed and Mapped Space. + Shape is (n_infill_points, n_features). Examples: ```{python} @@ -4017,12 +4100,113 @@ def sphere(X): fun=sphere, bounds=[(-5, 5), (-5, 5)], n_initial=5, - max_iter=10, - seed=0, + n_infill_points=2 ) - opt.optimize() + # Need to initialize optimization state (X_, y_, surrogate) + # Normally done inside optimize() + np.random.seed(0) + opt.X_ = np.random.rand(10, 2) + opt.y_ = np.random.rand(10) + opt._fit_surrogate(opt.X_, opt.y_) x_next = opt.suggest_next_infill_point() - print("Next point to evaluate:", x_next) + x_next.shape + ``` + """ + # 1. Optimizer candidates + candidates = [] + opt_candidates = self._try_optimizer_candidates( + n_needed=self.n_infill_points, current_batch=candidates + ) + candidates.extend(opt_candidates) + + if len(candidates) >= self.n_infill_points: + return np.vstack(candidates) + + # 2. Try fallback strategy to fill remaining slots + while len(candidates) < self.n_infill_points: + # Just try one attempt at a time but loop + # We pass current batch to avoid dups + cand, x_last = self._try_fallback_strategy( + max_attempts=10, current_batch=candidates + ) + if cand is not None: + candidates.append(cand) + else: + # Fallback failed to find unique point even after retries + # Break and fill with last attempts or just return what we have? + # If we return partial batch, we might fail downstream if code expects n points? + # Actually code should handle any number of points returned by this method? + # Or duplicate valid points? + # Warn and use duplicate if absolutely necessary? + if self.verbose: + print( + "Warning: Could not fill all infill points with unique candidates." + ) + break + + if len(candidates) > 0: + return np.vstack(candidates) + + # 3. Return last attempt (duplicate) if absolutely nothing found + # This returns a single point (1, d). + # Should we return n copies? + # If n_infill_points > 1, we should probably output (n, d) + + if self.verbose: + print( + "Warning: Could not find unique point after optimization candidates and fallback attempts. " + "Returning last candidate (duplicate)." + ) + + # Verify x_last is not None + if x_last is None: + # Should practically not happen + x_next = self._handle_acquisition_failure() + return x_next.reshape(1, -1) + + # Return duplicated x_last to fill n_infill_points? OR just 1? + # Let's return 1 and let loop repeat it? + # But loop repeats based on x_next logic. + # If we return 1 point, it is treated as 1 point. + # If user asked for n_infill_points, maybe we should just return what we have (1 duplicated). + + return x_last.reshape(1, -1) + + + # ==================== + # TASK_OPTIM: + # optimize_acquisition_func() + # _optimize_run_task() + # optimize() + # execute_optimization_run() + # ==================== + + def optimize_acquisition_func(self) -> np.ndarray: + """Optimize the acquisition function to find the next point to evaluate. + + Returns: + ndarray: The optimized point(s). + If acquisition_fun_return_size == 1, returns 1D array of shape (n_features,). + If acquisition_fun_return_size > 1, returns 2D array of shape (N, n_features), + where N is min(acquisition_fun_return_size, population_size). + + Examples: + ```{python} + import numpy as np + from spotoptim import SpotOptim + def sphere(X): + X = np.atleast_2d(X) + return np.sum(X**2, axis=1) + opt = SpotOptim( + fun=sphere, + bounds=[(-5, 5), (-5, 5)], + n_initial=5, + max_iter=10, + seed=0, + ) + opt.optimize() + x_next = opt.suggest_next_infill_point() + print("Next point to evaluate:", x_next) ``` """ if self.acquisition_optimizer == "tricands": @@ -4038,9 +4222,6 @@ def sphere(X): else: return self._optimize_acquisition_scipy() - # ==================== - # Optimization Loop - # ==================== def _optimize_run_task( self, @@ -4296,440 +4477,538 @@ def execute_optimization_run( ) # ==================== - # Sequential Optimization Loop + # TASK_OPTIM_SEQ: + # determine_termination() + # apply_penalty_NA() + # update_best_main_loop() + # handle_NA_new_points() + # optimize_sequential_run() + # _initialize_run() + # update_repeats_infill_points() + # _run_sequential_loop() # ==================== - def optimize_sequential_run( - self, - timeout_start: float, - X0: Optional[np.ndarray] = None, - y0_known: Optional[float] = None, - max_iter_override: Optional[int] = None, - shared_best_y=None, - shared_lock=None, - ) -> Tuple[str, OptimizeResult]: - """Perform a single sequential optimization run. - Calls _initialize_run, rm_NA_values, check_size_initial_design, init_storage, get_best_xy_initial_design, and _run_sequential_loop. + def determine_termination(self, timeout_start: float) -> str: + """Determine termination reason for optimization. + Checks the termination conditions and returns an appropriate message + indicating why the optimization stopped. Three possible termination + conditions are checked in order of priority: + 1. Maximum number of evaluations reached + 2. Maximum time limit exceeded + 3. Successful completion (neither limit reached) + Args: - timeout_start (float): Start time for timeout. - X0 (Optional[np.ndarray]): Initial design points in Natural Space, shape (n_initial, n_features). - y0_known (Optional[float]): Known best value for initial design. - max_iter_override (Optional[int]): Override for maximum number of iterations. - shared_best_y (Optional[float]): Shared best value for parallel runs. - shared_lock (Optional[Lock]): Shared lock for parallel runs. + timeout_start (float): Start time of optimization (from time.time()). Returns: - Tuple[str, OptimizeResult]: Tuple containing status and optimization result. - - Raises: - ValueError: If the initial design has no valid points after removing NaN/inf values, or if the initial design is too small to proceed. + str: Message describing the termination reason. Examples: ```{python} + import numpy as np + import time + from spotoptim import SpotOptim + opt = SpotOptim( + fun=lambda X: np.sum(X**2, axis=1), + bounds=[(-5, 5), (-5, 5)], + max_iter=10, + max_time=10.0 + ) + # Case 1: Maximum evaluations reached + opt.y_ = np.zeros(20) # Simulate 20 evaluations + start_time = time.time() + msg = opt.determine_termination(start_time) + print(msg) + ``` + ```{python} + # Case 2: Time limit exceeded + import numpy as np import time + from spotoptim import SpotOptim + opt.y_ = np.zeros(10) # Only 10 evaluations + start_time = time.time() - 700 # Simulate 11.67 minutes elapsed + msg = opt.determine_termination(start_time) + print(msg) + ``` + ```{python} + # Case 3: Successful completion import numpy as np + import time from spotoptim import SpotOptim - from spotoptim.function import sphere - opt = SpotOptim(fun=sphere, - bounds=[(-5, 5), (-5, 5)], - n_initial=5, - max_iter=10, - seed=0, - n_jobs=1, # Use sequential optimization for deterministic output - verbose=True - ) - status, result = opt.optimize_sequential_run(timeout_start=time.time()) - print(status) - print(result.message.splitlines()[0]) + opt.y_ = np.zeros(10) # Under max_iter + start_time = time.time() # Just started + msg = opt.determine_termination(start_time) + print(msg) ``` """ + # Determine termination reason + elapsed_time = time.time() - timeout_start + if len(self.y_) >= self.max_iter: + message = f"Optimization terminated: maximum evaluations ({self.max_iter}) reached" + elif elapsed_time >= self.max_time * 60: + message = ( + f"Optimization terminated: time limit ({self.max_time:.2f} min) reached" + ) + else: + message = "Optimization finished successfully" - # Store shared variable if provided - self.shared_best_y = shared_best_y - self.shared_lock = shared_lock - - # Initialize: Set seed, Design, Evaluate Initial Design, Init Storage & TensorBoard - X0, y0 = self._initialize_run(X0, y0_known) - - # Handle NaN/inf values in initial design (remove invalid points) - X0, y0, n_evaluated = self.rm_NA_values(X0, y0) - - # Check if we have enough valid points to continue - self.check_size_initial_design(y0, n_evaluated) - - # Initialize storage and statistics - self.init_storage(X0, y0) - self._zero_success_count = 0 - self._success_history = [] # Clear success history for new run - - # Update stats after initial design - self.update_stats() - - # Log initial design to TensorBoard - self._init_tensorboard() + return message - # Determine and report initial best - self.get_best_xy_initial_design() - # Run the main sequential optimization loop - effective_max_iter = ( - max_iter_override if max_iter_override is not None else self.max_iter - ) - return self._run_sequential_loop(timeout_start, effective_max_iter) - def _initialize_run( - self, X0: Optional[np.ndarray], y0_known: Optional[float] - ) -> Tuple[np.ndarray, np.ndarray]: - """Initialize optimization run: seed, design generation, initial evaluation. - Called from optimize_sequential_run (sequential path only). + def apply_penalty_NA( + self, + y: np.ndarray, + y_history: Optional[np.ndarray] = None, + penalty_value: Optional[float] = None, + sd: float = 0.1, + ) -> np.ndarray: + """Replace NaN and infinite values with penalty plus random noise. + Used in the optimize() method after function evaluations. + This method follows the approach from spotpython.utils.repair.apply_penalty_NA, + replacing NaN/inf values with a penalty value plus random noise to avoid + identical penalty values. Args: - X0 (Optional[np.ndarray]): Initial design points in Natural Space, shape (n_initial, n_features). - y0_known (Optional[float]): Known best value for initial design. + y (ndarray): Array of objective function values to be repaired. + y_history (ndarray, optional): Historical objective function values used for + computing penalty statistics. If None, uses y itself. Default is None. + penalty_value (float, optional): Value to replace NaN/inf with. + If None, computes penalty as: max(finite_y_history) + 3 * std(finite_y_history). + If all values are NaN/inf or only one finite value exists, falls back + to self.penalty_val. Default is None. + sd (float): Standard deviation for normal distributed random noise added to penalty. + Default is 0.1. Returns: - Tuple[np.ndarray, np.ndarray]: Tuple containing initial design and corresponding objective values. - - Raises: - ValueError: If the initial design has no valid points after removing NaN/inf values, or if the initial design is too small to proceed. + ndarray: + Array with NaN/inf replaced by penalty_value + random noise + (normal distributed with mean 0 and standard deviation sd). Examples: - >>> import numpy as np - >>> from spotoptim import SpotOptim - >>> opt = SpotOptim( - ... fun=lambda X: np.sum(X**2, axis=1), - ... bounds=[(-5, 5), (-5, 5)], - ... n_initial=5, - ... seed=0, - ... x0=np.array([0.0, 0.0]), - ... verbose=True - ... ) - >>> X0, y0 = opt._initialize_run(X0=None, y0_known=None) - >>> X0.shape - (5, 2) - >>> np.allclose(y0, np.sum(X0**2, axis=1)) - True + ```{python} + import numpy as np + from spotoptim import SpotOptim + opt = SpotOptim(fun=lambda X: np.sum(X**2, axis=1), bounds=[(-5, 5)]) + y_hist = np.array([1.0, 2.0, 3.0, 5.0]) + y_new = np.array([4.0, np.nan, np.inf]) + y_clean = opt.apply_penalty_NA(y_new, y_history=y_hist) + print(f"np.all(np.isfinite(y_clean)): {np.all(np.isfinite(y_clean))}") + print(f"y_clean: {y_clean}") + # NaN/inf replaced with worst value from history + 3*std + noise + print(f"y_clean[1] > 5.0: {y_clean[1] > 5.0}") # Should be larger than max finite value in history + ``` """ - # Set seed for reproducibility - self.set_seed() - # Set initial design (generate or process user-provided points) - X0 = self.get_initial_design(X0) + # Ensure y is a float array (maps non-convertible values like "error" or None to nan) + def _safe_float(v): + try: + return float(v) + except (ValueError, TypeError): + return np.nan - # Curate initial design (remove duplicates, generate additional points if needed, repeat if necessary) - X0 = self.curate_initial_design(X0) + y_flat = np.array(y).flatten() + y = np.array([_safe_float(v) for v in y_flat]) + # Identify NaN and inf values in y + mask = ~np.isfinite(y) - # Evaluate initial design - if y0_known is not None and self.x0 is not None: - # Identify injected point to skip evaluation - dists = np.linalg.norm(X0 - self.x0, axis=1) - # Use a small tolerance for matching - matches = dists < 1e-9 + if np.any(mask): + n_bad = np.sum(mask) - if np.any(matches): - if self.verbose: - print("Skipping re-evaluation of injected best point.") + # Compute penalty_value if not provided + if penalty_value is None: + # Get finite values from history for statistics + # Use y_history if provided, otherwise fall back to y itself + if y_history is not None: + finite_values = y_history[np.isfinite(y_history)] + else: + # Use current y values + finite_values = y[~mask] - # Initialize y0 - y0 = np.empty(len(X0)) - y0[:] = np.nan + # If we have at least 2 finite values, compute adaptive penalty + if len(finite_values) >= 2: + max_y = np.max(finite_values) + std_y = np.std(finite_values, ddof=1) + penalty_value = max_y + 3.0 * std_y - # Set known values - y0[matches] = y0_known + if self.verbose: + print( + f"Warning: Found {n_bad} NaN/inf value(s), replacing with " + f"adaptive penalty (max + 3*std = {penalty_value:.4f})" + ) + else: + # Fallback to self.penalty if insufficient finite values + if self.penalty_val is not None: + penalty_value = self.penalty_val + elif len(finite_values) == 1: + # Use the single finite value + a large constant + penalty_value = finite_values[0] + 1000.0 + else: + # All values are NaN/inf, use a large default + penalty_value = 1e10 - # Evaluate others - not_matches = ~matches - if np.any(not_matches): - y0_others = self.evaluate_function(X0[not_matches]) - y0[not_matches] = y0_others + if self.verbose: + print( + f"Warning: Found {n_bad} NaN/inf value(s), insufficient finite values " + f"for adaptive penalty. Using penalty_value = {penalty_value}" + ) else: - # Injected point lost during curation? Should not happen if it was unique - y0 = self.evaluate_function(X0) - else: - y0 = self.evaluate_function(X0) + if self.verbose: + print( + f"Warning: Found {n_bad} NaN/inf value(s), replacing with {penalty_value} + noise" + ) - return X0, y0 + # Generate random noise and add to penalty + random_noise = self.rng.normal(0, sd, y.shape) + penalty_values = penalty_value + random_noise + # Replace NaN/inf with penalty + noise + y[mask] = penalty_values[mask] + return y - def update_repeats_infill_points(self, x_next: np.ndarray) -> np.ndarray: - """Repeat infill point for noisy function evaluation. Used in the sequential_loop. - For noisy objective functions (repeats_surrogate > 1), creates multiple - copies of the suggested point for repeated evaluation. Otherwise, returns - the point in 2D array format. + def _update_best_main_loop( + self, + x_next_repeated: np.ndarray, + y_next: np.ndarray, + start_time: Optional[float] = None, + ) -> None: + """Update best solution found during main optimization loop. + Checks if any new evaluations improve upon the current best solution. + If improvement is found, updates best_x_ and best_y_ attributes and + prints progress if verbose mode is enabled. Args: - x_next (ndarray): Next point to evaluate, shape (n_features,). - - Returns: - ndarray: Points to evaluate, shape (repeats_surrogate, n_features) - or (1, n_features) if repeats_surrogate == 1. + x_next_repeated (ndarray): Design points that were evaluated in transformed space, + shape (n_eval, n_features). + y_next (ndarray): Function values at x_next_repeated, shape (n_eval,). Examples: - ```{python} - import numpy as np - from spotoptim import SpotOptim - from spotoptim.function import sphere, noisy_sphere - # Without repeats - - opt = SpotOptim( - fun=sphere, - bounds=[(-5, 5), (-5, 5)], - repeats_surrogate=1 - ) - x_next = np.array([1.0, 2.0]) - x_repeated = opt.update_repeats_infill_points(x_next) - print(x_repeated.shape) - - # With repeats for noisy function - opt_noisy = SpotOptim( - fun=noisy_sphere, - bounds=[(-5, 5), (-5, 5)], - repeats_surrogate=3 - ) - x_next = np.array([1.0, 2.0]) - x_repeated = opt_noisy.update_repeats_infill_points(x_next) - print(x_repeated.shape) - # All three copies should be identical - np.all(x_repeated[0] == x_repeated[1]) - ``` + >>> import numpy as np + >>> from spotoptim import SpotOptim + >>> opt = SpotOptim( + ... fun=lambda X: np.sum(X**2, axis=1), + ... bounds=[(-5, 5), (-5, 5)], + ... n_initial=5, + ... verbose=True + ... ) + >>> # Simulate optimization state + >>> opt.n_iter_ = 1 + >>> opt.best_x_ = np.array([1.0, 1.0]) + >>> opt.best_y_ = 2.0 + >>> + >>> # Case 1: New best found + >>> x_new = np.array([[0.1, 0.1], [0.5, 0.5]]) + >>> y_new = np.array([0.02, 0.5]) + >>> opt._update_best_main_loop(x_new, y_new) + Iteration 1: New best f(x) = 0.020000 + >>> opt.best_y_ + 0.02 + >>> + >>> # Case 2: No improvement + >>> opt.n_iter_ = 2 + >>> x_no_improve = np.array([[1.5, 1.5]]) + >>> y_no_improve = np.array([4.5]) + >>> opt._update_best_main_loop(x_no_improve, y_no_improve) + Iteration 2: f(x) = 4.500000 + >>> + >>> # Case 3: With noisy function + >>> opt_noise = SpotOptim( + ... fun=lambda X: np.sum(X**2, axis=1), + ... bounds=[(-5, 5), (-5, 5)], + ... repeats_surrogate=2, + ... verbose=True + ... ) + >>> opt_noise.n_iter_ = 1 + >>> opt_noise.best_y_ = 2.0 + >>> opt_noise.min_mean_y = 1.5 + >>> y_noise = np.array([0.5]) + >>> x_noise = np.array([[0.5, 0.5]]) + >>> opt_noise._update_best_main_loop(x_noise, y_noise) + Iteration 1: New best f(x) = 0.500000, mean best: f(x) = 1.500000 """ - if x_next.ndim == 1: - x_next = x_next.reshape(1, -1) + # Update best + # Determine global best value for printing if shared variable exists + global_best_val = None + if hasattr(self, "shared_best_y") and self.shared_best_y is not None: + # Sync with global shared value + lock_obj = getattr(self, "shared_lock", None) + if lock_obj is not None: + with lock_obj: + if ( + self.best_y_ is not None + and self.best_y_ < self.shared_best_y.value + ): + self.shared_best_y.value = self.best_y_ - if self.repeats_surrogate > 1: - # Repeat each row repeats_surrogate times - # Note: np.repeat with axis=0 repeats rows [r1, r1, r2, r2...] - x_next_repeated = np.repeat(x_next, self.repeats_surrogate, axis=0) - else: - x_next_repeated = x_next - return x_next_repeated + min_y_next = np.min(y_next) + if min_y_next < self.shared_best_y.value: + self.shared_best_y.value = min_y_next + global_best_val = self.shared_best_y.value - def _run_sequential_loop( - self, timeout_start: float, effective_max_iter: int - ) -> Tuple[str, OptimizeResult]: - """Execute the main sequential optimization loop. + current_best = np.min(y_next) + if current_best < self.best_y_: + best_idx_in_new = np.argmin(y_next) + # x_next_repeated is in transformed space, convert to original for storage + self.best_x_ = self.inverse_transform_X( + x_next_repeated[best_idx_in_new].reshape(1, -1) + )[0] + self.best_y_ = current_best - Args: - timeout_start (float): Start time for timeout. - effective_max_iter (int): Maximum number of iterations for this run (may be overridden for restarts). + if self.verbose: + # Calculate progress + if self.max_time != np.inf and start_time is not None: + progress = (time.time() - start_time) / (self.max_time * 60) * 100 + progress_str = f"Time: {progress:.1f}%" + else: + prev_evals = sum(res.nfev for res in self.restarts_results_) + progress = (prev_evals + self.counter) / self.max_iter * 100 + progress_str = f"Evals: {progress:.1f}%" - Returns: - Tuple[str, OptimizeResult]: Tuple containing status and optimization result. + msg = f"Iter {self.n_iter_}" + if global_best_val is not None: + msg += f" | GlobalBest: {global_best_val:.6f}" + msg += f" | Best: {self.best_y_:.6f} | Rate: {self.success_rate:.2f} | {progress_str}" - Raises: - ValueError: - If excessive consecutive failures occur (e.g., due to NaN/inf values in evaluations), indicating a potential issue with the objective function. + if (self.repeats_initial > 1) or (self.repeats_surrogate > 1): + msg += f" | Mean Best: {self.min_mean_y:.6f}" - Examples: - >>> import time - >>> import numpy as np - >>> from spotoptim import SpotOptim - >>> opt = SpotOptim( - ... fun=lambda X: np.sum(X**2, axis=1), - ... bounds=[(-5, 5), (-5, 5)], - ... n_initial=5, - ... max_iter=10, - ... seed=0, - ... n_jobs=1, # Use sequential optimization for deterministic output - ... verbose=True - ... ) - >>> X0, y0 = opt._initialize_run(X0=None, y0_known=None) - >>> X0, y0, n_evaluated = opt.rm_NA_values(X0, y0) - >>> opt.check_size_initial_design(y0, n_evaluated) - >>> opt.init_storage(X0, y0) - >>> opt._zero_success_count = 0 - >>> opt._success_history = [] - >>> opt.update_stats() - >>> opt.get_best_xy_initial_design() - >>> status, result = opt._run_sequential_loop(timeout_start=time.time(), effective_max_iter=10) - >>> print(status) - FINISHED - >>> print(result.message.splitlines()[0]) - Optimization terminated: maximum evaluations (10) reached - """ - consecutive_failures = 0 + print(msg) + elif self.verbose: + if self.max_time != np.inf and start_time is not None: + progress = (time.time() - start_time) / (self.max_time * 60) * 100 + progress_str = f"Time: {progress:.1f}%" + else: + prev_evals = sum(res.nfev for res in self.restarts_results_) + progress = (prev_evals + self.counter) / self.max_iter * 100 + progress_str = f"Evals: {progress:.1f}%" - while (len(self.y_) < effective_max_iter) and ( - time.time() < timeout_start + self.max_time * 60 - ): - # Check for excessive consecutive failures (infinite loop prevention) - if consecutive_failures > self.max_iter: - msg = ( - f"Optimization stopped due to {consecutive_failures} consecutive " - "invalid evaluations (NaN/inf). Check your objective function." - ) - if self.verbose: - print(f"Warning: {msg}") - return "FINISHED", OptimizeResult( - x=self.best_x_, - fun=self.best_y_, - nfev=len(self.y_), - nit=self.n_iter_, - success=False, - message=msg, - X=self.X_, - y=self.y_, - ) + current_val = np.min(y_next) + msg = f"Iter {self.n_iter_}" + if global_best_val is not None: + msg += f" | GlobalBest: {global_best_val:.6f}" + msg += f" | Best: {self.best_y_:.6f} | Curr: {current_val:.6f} | Rate: {self.success_rate:.2f} | {progress_str}" - # Increment iteration counter. This is not the same as number of function evaluations. - self.n_iter_ += 1 + if (self.repeats_initial > 1) or (self.repeats_surrogate > 1): + mean_y_new = np.mean(y_next) + msg += f" | Mean Curr: {mean_y_new:.6f}" + print(msg) - # Fit surrogate (use mean_y if noise, otherwise y_) - self._fit_scheduler() + def _handle_NA_new_points( + self, x_next: np.ndarray, y_next: np.ndarray + ) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]: + """Handle NaN/inf values in new evaluation points. + Applies penalties to NaN/inf values and removes any remaining invalid points. + If all evaluations are invalid, returns None for both arrays to signal that + the iteration should be skipped. - # Apply OCBA for noisy functions - X_ocba = self.apply_ocba() + Args: + x_next (ndarray): Design points that were evaluated, shape (n_eval, n_features). + y_next (ndarray): Function values at x_next, shape (n_eval,). - # Suggest next point - x_next = self.suggest_next_infill_point() + Returns: + Tuple[Optional[ndarray], Optional[ndarray]]: Tuple of (x_clean, y_clean). + Both are None if all evaluations were NaN/inf (iteration should be skipped). + Otherwise returns filtered arrays with only finite values. - # Repeat next point if repeats_surrogate > 1 - x_next_repeated = self.update_repeats_infill_points(x_next) + Examples: + >>> import numpy as np + >>> from spotoptim import SpotOptim + >>> opt = SpotOptim( + ... fun=lambda X: np.sum(X**2, axis=1), + ... bounds=[(-5, 5), (-5, 5)], + ... n_initial=5, + ... verbose=True + ... ) + >>> # Simulate optimization state + >>> opt.y_ = np.array([1.0, 2.0, 3.0]) # Historical values + >>> opt.n_iter_ = 1 + >>> + >>> # Case 1: Some valid values + >>> x_next = np.array([[1, 2], [3, 4], [5, 6]]) + >>> y_next = np.array([5.0, np.nan, 10.0]) + >>> x_clean, y_clean = opt._handle_NA_new_points(x_next, y_next) + >>> x_clean.shape + (2, 2) + >>> y_clean.shape + (2,) + >>> + >>> # Case 2: All NaN/inf - should skip iteration + >>> x_all_bad = np.array([[1, 2], [3, 4]]) + >>> y_all_bad = np.array([np.nan, np.inf]) + >>> x_clean, y_clean = opt._handle_NA_new_points(x_all_bad, y_all_bad) + Warning: All new evaluations were NaN/inf, skipping iteration 1 + >>> x_clean is None + True + >>> y_clean is None + True + """ + # Handle NaN/inf values in new evaluations + # Use historical y values (self.y_) for computing penalty statistics + if self.penalty: + y_next = self.apply_penalty_NA(y_next, y_history=self.y_) - # Append OCBA points to new design points (if applicable) - if X_ocba is not None: - x_next_repeated = append(X_ocba, x_next_repeated, axis=0) + # Identify which points are valid (finite) BEFORE removing them + # Note: remove_nan filters based on y_next finite values - # Evaluate next point(s) including OCBA points - y_next = self.evaluate_function(x_next_repeated) + # Ensure y_next is a float array (maps non-convertible values like "error" or None to nan) + # This is critical if the objective function returns non-numeric values and penalty=False + if y_next.dtype == object: + # Use safe float conversion similar to apply_penalty_NA + def _safe_float(v): + try: + return float(v) + except (ValueError, TypeError): + return np.nan - # Handle NaN/inf values in new evaluations - x_next_repeated, y_next = self._handle_NA_new_points( - x_next_repeated, y_next - ) - if x_next_repeated is None: - consecutive_failures += 1 - continue # Skip iteration if all evaluations were invalid + # Reconstruct as float array + y_flat = np.array(y_next).flatten() + y_next = np.array([_safe_float(v) for v in y_flat]) - # Reset failure counter if we got valid points - consecutive_failures = 0 + finite_mask = np.isfinite(y_next) - # Update success rate BEFORE updating storage (so it compares against previous best) - self.update_success_rate(y_next) + X_next_clean, y_next_clean = self.remove_nan( + x_next, y_next, stop_on_zero_return=False + ) - # Check for restart - if self.success_rate == 0.0: - self._zero_success_count += 1 - else: - self._zero_success_count = 0 + # If we have multi-objective values, we need to filter them too + # The new MO values were appended to self.y_mo in evaluate_function -> mo2so -> store_mo + # So self.y_mo currently contains the INVALID points at the end. + if self.y_mo is not None: + n_new = len(y_next) + # Check if y_mo has the new points appended + if len(self.y_mo) >= n_new: + # The new points are at the end of y_mo + y_mo_new = self.y_mo[-n_new:] + y_mo_old = self.y_mo[:-n_new] - if self._zero_success_count >= self.restart_after_n: + # Filter the new MO points using the mask from y_next + y_mo_new_clean = y_mo_new[finite_mask] + + # Reconstruct y_mo + if len(y_mo_old) > 0: + self.y_mo = ( + np.vstack([y_mo_old, y_mo_new_clean]) + if len(y_mo_new_clean) > 0 + else y_mo_old + ) + else: + self.y_mo = y_mo_new_clean + else: if self.verbose: print( - f"Restarting optimization: success_rate 0 for {self._zero_success_count} iterations." + "Warning: y_mo size inconsistent with new points in _handle_NA_new_points" ) - status_message = "Restart triggered due to lack of improvement." - - # Expand results to full dimensions if needed - best_x_full = ( - self.to_all_dim(self.best_x_.reshape(1, -1))[0] - if self.red_dim - else self.best_x_ + # Skip this iteration if all new points were NaN/inf + if len(y_next_clean) == 0: + if self.verbose: + print( + f"Warning: All new evaluations were NaN/inf, skipping iteration {self.n_iter_}" ) - X_full = self.to_all_dim(self.X_) if self.red_dim else self.X_ + return None, None - # Map factor variables back to original strings - best_x_result = self.map_to_factor_values(best_x_full.reshape(1, -1))[0] - X_result = ( - self.map_to_factor_values(X_full) if self._factor_maps else X_full - ) + return X_next_clean, y_next_clean - res = OptimizeResult( - x=best_x_result, - fun=self.best_y_, - nfev=len(self.y_), - nit=self.n_iter_, - success=False, - message=status_message, - X=X_result, - y=self.y_, - ) - return "RESTART", res + def optimize_sequential_run( + self, + timeout_start: float, + X0: Optional[np.ndarray] = None, + y0_known: Optional[float] = None, + max_iter_override: Optional[int] = None, + shared_best_y=None, + shared_lock=None, + ) -> Tuple[str, OptimizeResult]: + """Perform a single sequential optimization run. + Calls _initialize_run, rm_initial_design_NA_values, check_size_initial_design, init_storage, get_best_xy_initial_design, and _run_sequential_loop. - # Update storage - self.update_storage(x_next_repeated, y_next) - # Update stats - self.update_stats() + Args: + timeout_start (float): Start time for timeout. + X0 (Optional[np.ndarray]): Initial design points in Natural Space, shape (n_initial, n_features). + y0_known (Optional[float]): Known best value for initial design. + max_iter_override (Optional[int]): Override for maximum number of iterations. + shared_best_y (Optional[float]): Shared best value for parallel runs. + shared_lock (Optional[Lock]): Shared lock for parallel runs. - # Log to TensorBoard - if self.tb_writer is not None: - # Log each new evaluation - for i in range(len(y_next)): - self._write_tensorboard_hparams(x_next_repeated[i], y_next[i]) - self._write_tensorboard_scalars() + Returns: + Tuple[str, OptimizeResult]: Tuple containing status and optimization result. - # Update best solution - self._update_best_main_loop( - x_next_repeated, y_next, start_time=timeout_start - ) + Raises: + ValueError: If the initial design has no valid points after removing NaN/inf values, or if the initial design is too small to proceed. - # Expand results to full dimensions if needed - # Note: best_x_ and X_ are already in original scale (stored that way) - best_x_full = ( - self.to_all_dim(self.best_x_.reshape(1, -1))[0] - if self.red_dim - else self.best_x_ - ) - X_full = self.to_all_dim(self.X_) if self.red_dim else self.X_ + Examples: + ```{python} + import time + import numpy as np + from spotoptim import SpotOptim + from spotoptim.function import sphere + opt = SpotOptim(fun=sphere, + bounds=[(-5, 5), (-5, 5)], + n_initial=5, + max_iter=10, + seed=0, + n_jobs=1, # Use sequential optimization for deterministic output + verbose=True + ) + status, result = opt.optimize_sequential_run(timeout_start=time.time()) + print(status) + print(result.message.splitlines()[0]) + ``` + """ - # Determine termination reason - status_message = self.determine_termination(timeout_start) + # Store shared variable if provided + self.shared_best_y = shared_best_y + self.shared_lock = shared_lock - # Append statistics to match scipy.optimize.minimize format - message = ( - f"{status_message}\n" - f" Current function value: {float(self.best_y_):.6f}\n" - f" Iterations: {self.n_iter_}\n" - f" Function evaluations: {len(self.y_)}" - ) + # Initialize: Set seed, Design, Evaluate Initial Design, Init Storage & TensorBoard + X0, y0 = self._initialize_run(X0, y0_known) - # Close TensorBoard writer - self._close_tensorboard_writer() + # Handle NaN/inf values in initial design (remove invalid points) + X0, y0, n_evaluated = self.rm_initial_design_NA_values(X0, y0) - # Map factor variables back to original strings for results - best_x_result = self.map_to_factor_values(best_x_full.reshape(1, -1))[0] - X_result = self.map_to_factor_values(X_full) if self._factor_maps else X_full + # Check if we have enough valid points to continue + self.check_size_initial_design(y0, n_evaluated) - # Return scipy-style result - return "FINISHED", OptimizeResult( - x=best_x_result, - fun=self.best_y_, - nfev=len(self.y_), - nit=self.n_iter_, - success=True, - message=message, - X=X_result, - y=self.y_, + # Initialize storage and statistics + self.init_storage(X0, y0) + self._zero_success_count = 0 + self._success_history = [] # Clear success history for new run + + # Update stats after initial design + self.update_stats() + + # Log initial design to TensorBoard + self._init_tensorboard() + + # Determine and report initial best + self.get_best_xy_initial_design() + + # Run the main sequential optimization loop + effective_max_iter = ( + max_iter_override if max_iter_override is not None else self.max_iter ) + return self._run_sequential_loop(timeout_start, effective_max_iter) - def _update_storage_steady(self, x, y): - """Helper to safely append single point (for steady state). - This method is designed for the steady-state parallel optimization scenario, where new points are evaluated and returned asynchronously. - It safely appends new points to the existing storage of evaluated points and their function values, - while also updating the current best solution if the new point is better. + def _initialize_run( + self, X0: Optional[np.ndarray], y0_known: Optional[float] + ) -> Tuple[np.ndarray, np.ndarray]: + """Initialize optimization run: seed, design generation, initial evaluation. + Called from optimize_sequential_run (sequential path only). Args: - x (ndarray): - New point(s) in original scale, shape (n_features,) or (N, n_features). - y (float or ndarray): - Corresponding function value(s). + X0 (Optional[np.ndarray]): Initial design points in Natural Space, shape (n_initial, n_features). + y0_known (Optional[float]): Known best value for initial design. Returns: - None. This method updates the internal state of the optimizer. - - Note: - - This method assumes that the caller handles any necessary synchronization if used in a parallel context - (e.g., using locks when updating shared state). + Tuple[np.ndarray, np.ndarray]: Tuple containing initial design and corresponding objective values. Raises: - ValueError: If the input shapes are inconsistent or if y is not a scalar when x is a single point. + ValueError: If the initial design has no valid points after removing NaN/inf values, or if the initial design is too small to proceed. Examples: >>> import numpy as np @@ -4737,1051 +5016,865 @@ def _update_storage_steady(self, x, y): >>> opt = SpotOptim( ... fun=lambda X: np.sum(X**2, axis=1), ... bounds=[(-5, 5), (-5, 5)], - ... n_jobs=2 + ... n_initial=5, + ... seed=0, + ... x0=np.array([0.0, 0.0]), + ... verbose=True ... ) - >>> opt._update_storage_steady(np.array([1.0, 2.0]), 5.0) - >>> print(opt.X_) - [[1. 2.]] - >>> print(opt.y_) - [5.] - >>> print(opt.best_x_) - [1. 2.] - >>> print(opt.best_y_) - 5.0 - + >>> X0, y0 = opt._initialize_run(X0=None, y0_known=None) + >>> X0.shape + (5, 2) + >>> np.allclose(y0, np.sum(X0**2, axis=1)) + True """ - x = np.atleast_2d(x) - if self.X_ is None: - self.X_ = x - self.y_ = np.array([y]) + # Set seed for reproducibility + self.set_seed() + + # Set initial design (generate or process user-provided points) + X0 = self.get_initial_design(X0) + + # Curate initial design (remove duplicates, generate additional points if needed, repeat if necessary) + X0 = self.curate_initial_design(X0) + + # Evaluate initial design + if y0_known is not None and self.x0 is not None: + # Identify injected point to skip evaluation + dists = np.linalg.norm(X0 - self.x0, axis=1) + # Use a small tolerance for matching + matches = dists < 1e-9 + + if np.any(matches): + if self.verbose: + print("Skipping re-evaluation of injected best point.") + + # Initialize y0 + y0 = np.empty(len(X0)) + y0[:] = np.nan + + # Set known values + y0[matches] = y0_known + + # Evaluate others + not_matches = ~matches + if np.any(not_matches): + y0_others = self.evaluate_function(X0[not_matches]) + y0[not_matches] = y0_others + else: + # Injected point lost during curation? Should not happen if it was unique + y0 = self.evaluate_function(X0) else: - self.X_ = np.vstack([self.X_, x]) - self.y_ = np.append(self.y_, y) + y0 = self.evaluate_function(X0) - # Update best - if self.best_y_ is None or y < self.best_y_: - self.best_y_ = y - self.best_x_ = x.flatten() + return X0, y0 - self.min_y = self.best_y_ - self.min_X = self.best_x_ - def optimize_steady_state( - self, - timeout_start: float, - X0: Optional[np.ndarray], - y0_known: Optional[float] = None, - max_iter_override: Optional[int] = None, - ) -> Tuple[str, OptimizeResult]: - """Perform steady-state asynchronous optimization (n_jobs > 1). - This method implements a hybrid steady-state parallelization strategy. - The executor types are selected at runtime based on GIL availability: - Standard GIL build (Python ≤ 3.12 or GIL-enabled 3.13+): - * ``ProcessPoolExecutor`` (``eval_pool``) — objective function evaluations. - Process isolation ensures arbitrary callables (lambdas, closures) - serialized with ``dill`` run safely without touching shared state. - * ``ThreadPoolExecutor`` (``search_pool``) — surrogate search tasks. - Threads share the main-process heap; zero ``dill`` overhead. - A ``threading.Lock`` (``_surrogate_lock``) prevents a surrogate refit - from racing with an in-flight search thread. - Free-threaded build (``python3.13t`` / ``--disable-gil``): - * Both ``eval_pool`` and ``search_pool`` are ``ThreadPoolExecutor`` - instances. Threads achieve true CPU-level parallelism without the GIL. - The ``dill`` serialization step for eval tasks is eliminated — ``fun`` - is called directly from the shared heap. The ``_surrogate_lock`` is - still used to serialize surrogate reads and refits. - Pipeline: - 1. Parallel Initial Design: - ``n_initial`` points are dispatched to ``eval_pool``. Results are - collected via ``FIRST_COMPLETED`` until all initial evaluations finish. - 2. First Surrogate Fit: - Called on the main thread once all initial evaluations are in. - No lock is needed here because no search threads are active yet. - 3. Parallel Search (Thread Pool): - Up to ``n_jobs`` search tasks are submitted to ``search_pool``. - Each acquires ``_surrogate_lock`` before calling - ``suggest_next_infill_point()``, serializing concurrent surrogate reads. - 4. Steady-State Loop with Batch Dispatch: - - Search completes → candidate appended to ``pending_cands``. - - When ``len(pending_cands) >= eval_batch_size`` (or no search tasks - remain), all pending candidates are stacked into ``X_batch`` and - dispatched as a single eval call to ``eval_pool``. - On GIL builds this calls ``remote_batch_eval_wrapper`` (dill); - on free-threaded builds it calls ``fun`` directly in a thread. - - Batch eval completes → storage updated for every point, surrogate - refit once under ``_surrogate_lock``, new search slots filled. - - ``eval_batch_size=1`` (default) dispatches immediately on each - search completion, preserving the original one-point behavior. - - This cycle continues until ``max_iter`` evaluations or ``max_time`` - minutes is reached. - The optimization terminates when either: - - Total function evaluations reach ``max_iter`` (including initial design), OR - - Runtime exceeds ``max_time`` minutes + def update_repeats_infill_points(self, x_next: np.ndarray) -> np.ndarray: + """Repeat infill point for noisy function evaluation. Used in the sequential_loop. + For noisy objective functions (repeats_surrogate > 1), creates multiple + copies of the suggested point for repeated evaluation. Otherwise, returns + the point in 2D array format. Args: - timeout_start (float): Start time for timeout. - X0 (Optional[np.ndarray]): Initial design points in Natural Space, shape (n_initial, n_features). - y0_known (Optional[float]): Known best objective value from a previous run. - When provided together with ``self.x0``, the matching point in the initial - design is pre-filled with this value and not re-submitted to the worker - pool, saving one evaluation per restart (restart injection). - max_iter_override (Optional[int]): Override for maximum number of iterations. - - Raises: - RuntimeError: If all initial design evaluations fail, likely due to - pickling issues or missing imports in the worker process. - The error message provides guidance on how to address this issue. + x_next (ndarray): Next point to evaluate, shape (n_features,). Returns: - Tuple[str, OptimizeResult]: Tuple containing status and optimization result. + ndarray: Points to evaluate, shape (repeats_surrogate, n_features) + or (1, n_features) if repeats_surrogate == 1. Examples: ```{python} - import time + import numpy as np from spotoptim import SpotOptim - from spotoptim.function import sphere + from spotoptim.function import sphere, noisy_sphere + # Without repeats + opt = SpotOptim( - fun=sphere, - bounds=[(-5, 5), (-5, 5)], - n_initial=5, - max_iter=10, - seed=0, - n_jobs=2, + fun=sphere, + bounds=[(-5, 5), (-5, 5)], + repeats_surrogate=1 ) - status, result = opt.optimize_steady_state(timeout_start=time.time(), X0=None) - print(status) - print(result.message.splitlines()[0]) + x_next = np.array([1.0, 2.0]) + x_repeated = opt.update_repeats_infill_points(x_next) + print(x_repeated.shape) + + # With repeats for noisy function + opt_noisy = SpotOptim( + fun=noisy_sphere, + bounds=[(-5, 5), (-5, 5)], + repeats_surrogate=3 + ) + x_next = np.array([1.0, 2.0]) + x_repeated = opt_noisy.update_repeats_infill_points(x_next) + print(x_repeated.shape) + # All three copies should be identical + np.all(x_repeated[0] == x_repeated[1]) ``` """ - # Setup similar to _optimize_single_run - self.set_seed() - X0 = self.get_initial_design(X0) - X0 = self.curate_initial_design(X0) - - # Restart injection: if a known best value is provided, pre-fill the matching - # point in the initial design so it is not re-submitted to the worker pool. - # This mirrors the logic in _initialize_run() used by the sequential path. - y0_prefilled = np.full(len(X0), np.nan) - if y0_known is not None and self.x0 is not None: - dists = np.linalg.norm(X0 - self.x0, axis=1) - matches = dists < 1e-9 - if np.any(matches): - if self.verbose: - print("Skipping re-evaluation of injected best point.") - y0_prefilled[matches] = y0_known - - # We need to know how many evaluations to do - effective_max_iter = ( - max_iter_override if max_iter_override is not None else self.max_iter - ) - - # Import dill locally (assuming installed) - import dill - - from contextlib import ExitStack - from concurrent.futures import ( - ProcessPoolExecutor, - ThreadPoolExecutor, - wait, - FIRST_COMPLETED, - ) + if x_next.ndim == 1: + x_next = x_next.reshape(1, -1) - # Detect free-threaded (no-GIL) Python build (3.13+). - # On free-threaded builds ThreadPoolExecutor gives true CPU parallelism, - # so we can use threads for eval too and skip dill serialization entirely. - _no_gil = is_gil_disabled() + if self.repeats_surrogate > 1: + # Repeat each row repeats_surrogate times + # Note: np.repeat with axis=0 repeats rows [r1, r1, r2, r2...] + x_next_repeated = np.repeat(x_next, self.repeats_surrogate, axis=0) + else: + x_next_repeated = x_next + return x_next_repeated - # Lock that serialises surrogate access: - # - search threads call suggest_next_infill_point() under the lock - # - the main thread calls _fit_scheduler() under the lock after each eval - # This prevents a surrogate refit from racing with an in-flight search. - _surrogate_lock = threading.Lock() - def _thread_search_task(): - """Search task for ThreadPoolExecutor: direct call, no dill.""" - with _surrogate_lock: - return self.suggest_next_infill_point() + def _run_sequential_loop( + self, timeout_start: float, effective_max_iter: int + ) -> Tuple[str, OptimizeResult]: + """Execute the main sequential optimization loop. - def _thread_eval_task_single(x): - """Thread-based single-point eval for free-threaded Python (no dill).""" - try: - x_2d = x.reshape(1, -1) - y_arr = self.evaluate_function(x_2d) - return x, y_arr[0] - except Exception as e: - return None, e + Args: + timeout_start (float): Start time for timeout. + effective_max_iter (int): Maximum number of iterations for this run (may be overridden for restarts). - def _thread_batch_eval_task(X_batch): - """Thread-based batch eval for free-threaded Python (no dill).""" - try: - y_batch = self.evaluate_function(X_batch) - return X_batch, y_batch - except Exception as e: - return None, e + Returns: + Tuple[str, OptimizeResult]: Tuple containing status and optimization result. - # Build executor pair: - # GIL build → eval: ProcessPoolExecutor, search: ThreadPoolExecutor - # No-GIL build → eval: ThreadPoolExecutor, search: ThreadPoolExecutor - with ExitStack() as _stack: - eval_pool = _stack.enter_context( - ThreadPoolExecutor(max_workers=self.n_jobs) - if _no_gil - else ProcessPoolExecutor(max_workers=self.n_jobs) - ) - search_pool = _stack.enter_context( - ThreadPoolExecutor(max_workers=self.n_jobs) - ) - futures = {} # map future -> type ('eval', 'search') + Raises: + ValueError: + If excessive consecutive failures occur (e.g., due to NaN/inf values in evaluations), indicating a potential issue with the objective function. - # --- Phase 1: Initial Design Evaluation --- - # Pre-filled points (restart injection) are stored on the main thread; - # all remaining points are submitted to eval_pool concurrently. - n_to_submit = 0 - for i, x in enumerate(X0): - if np.isfinite(y0_prefilled[i]): - # Known from restart injection — store directly, skip the pool. - self._update_storage_steady(x, y0_prefilled[i]) - continue - if _no_gil: - # Free-threaded: call fun directly in a thread — no dill. - fut = eval_pool.submit(_thread_eval_task_single, x) - else: - # GIL build: serialize with dill for process isolation. - _tb_writer_temp = self.tb_writer - self.tb_writer = None - try: - pickled_args = dill.dumps((self, x)) - finally: - self.tb_writer = _tb_writer_temp - fut = eval_pool.submit(remote_eval_wrapper, pickled_args) - futures[fut] = "eval" - n_to_submit += 1 + Examples: + >>> import time + >>> import numpy as np + >>> from spotoptim import SpotOptim + >>> opt = SpotOptim( + ... fun=lambda X: np.sum(X**2, axis=1), + ... bounds=[(-5, 5), (-5, 5)], + ... n_initial=5, + ... max_iter=10, + ... seed=0, + ... n_jobs=1, # Use sequential optimization for deterministic output + ... verbose=True + ... ) + >>> X0, y0 = opt._initialize_run(X0=None, y0_known=None) + >>> X0, y0, n_evaluated = opt.rm_initial_design_NA_values(X0, y0) + >>> opt.check_size_initial_design(y0, n_evaluated) + >>> opt.init_storage(X0, y0) + >>> opt._zero_success_count = 0 + >>> opt._success_history = [] + >>> opt.update_stats() + >>> opt.get_best_xy_initial_design() + >>> status, result = opt._run_sequential_loop(timeout_start=time.time(), effective_max_iter=10) + >>> print(status) + FINISHED + >>> print(result.message.splitlines()[0]) + Optimization terminated: maximum evaluations (10) reached + """ + consecutive_failures = 0 - if self.verbose: - n_injected = int(np.sum(np.isfinite(y0_prefilled))) - suffix = ( - f" ({n_injected} injected from restart, skipped re-evaluation)." - if n_injected - else "." + while (len(self.y_) < effective_max_iter) and ( + time.time() < timeout_start + self.max_time * 60 + ): + # Check for excessive consecutive failures (infinite loop prevention) + if consecutive_failures > self.max_iter: + msg = ( + f"Optimization stopped due to {consecutive_failures} consecutive " + "invalid evaluations (NaN/inf). Check your objective function." ) - print( - f"Submitted {n_to_submit} initial points for parallel evaluation{suffix}" + if self.verbose: + print(f"Warning: {msg}") + return "FINISHED", OptimizeResult( + x=self.best_x_, + fun=self.best_y_, + nfev=len(self.y_), + nit=self.n_iter_, + success=False, + message=msg, + X=self.X_, + y=self.y_, ) - # Wait for all submitted initial evaluations to complete. - initial_done_count = 0 - while initial_done_count < n_to_submit: - done, _ = wait(futures.keys(), return_when=FIRST_COMPLETED) - for fut in done: - # Clean up - ftype = futures.pop(fut) - if ftype != "eval": - continue + # Increment iteration counter. This is not the same as number of function evaluations. + self.n_iter_ += 1 - try: - x_done, y_done = fut.result() - if isinstance(y_done, Exception): - if self.verbose: - print(f"Eval failed: {y_done}") - else: - self._update_storage_steady(x_done, y_done) - except Exception as e: - if self.verbose: - print(f"Task failed: {e}") + # Fit surrogate (use mean_y if noise, otherwise y_) + self._fit_scheduler() - initial_done_count += 1 + # Apply OCBA for noisy functions + X_ocba = self.apply_ocba() - # Init tensorboard and stats - self._init_tensorboard() + # Suggest next point + x_next = self.suggest_next_infill_point() - if self.y_ is None or len(self.y_) == 0: - raise RuntimeError( - "All initial design evaluations failed. " - "Check your objective function for pickling issues or missing imports (e.g. numpy) in the worker process. " - "If defining functions in a notebook/script, ensure imports are inside the function." - ) + # Repeat next point if repeats_surrogate > 1 + x_next_repeated = self.update_repeats_infill_points(x_next) - self.update_stats() - self.get_best_xy_initial_design() + # Append OCBA points to new design points (if applicable) + if X_ocba is not None: + x_next_repeated = append(X_ocba, x_next_repeated, axis=0) - # Fit first surrogate (no lock needed — no search threads active yet) - if self.verbose: - print( - f"Initial design evaluated. Fitting surrogate... (Data size: {len(self.y_)})" - ) - self._fit_scheduler() + # Evaluate next point(s) including OCBA points + y_next = self.evaluate_function(x_next_repeated) - # --- Phase 2: Steady State Loop --- - if self.verbose: - print("Starting steady-state optimization loop...") + # Handle NaN/inf values in new evaluations + x_next_repeated, y_next = self._handle_NA_new_points( + x_next_repeated, y_next + ) + if x_next_repeated is None: + consecutive_failures += 1 + continue # Skip iteration if all evaluations were invalid - # Candidates returned by search tasks, waiting to form the next batch. - pending_cands: list = [] + # Reset failure counter if we got valid points + consecutive_failures = 0 - # Maps each in-flight batch_eval future → number of points it carries. - # Used for accurate budget accounting: reserved = committed + in_flight - # + pending, so we never over-dispatch search tasks. - _future_n_pts: dict = {} + # Update success rate BEFORE updating storage (so it compares against previous best) + self.update_success_rate(y_next) - def _flush_batch() -> None: - """Dispatch all pending_cands as a single batch eval task.""" - nonlocal pending_cands - X_batch = np.vstack(pending_cands) - n_in_batch = len(pending_cands) - pending_cands = [] - if _no_gil: - # Free-threaded: call fun directly in a thread — no dill. - fut_eval = eval_pool.submit(_thread_batch_eval_task, X_batch) - else: - # GIL build: serialize with dill for process isolation. - _tb_writer_temp = self.tb_writer - self.tb_writer = None - try: - pickled_args = dill.dumps((self, X_batch)) - finally: - self.tb_writer = _tb_writer_temp - fut_eval = eval_pool.submit(remote_batch_eval_wrapper, pickled_args) - futures[fut_eval] = "batch_eval" - _future_n_pts[fut_eval] = n_in_batch + # Check for restart + if self.success_rate == 0.0: + self._zero_success_count += 1 + else: + self._zero_success_count = 0 - def _batch_ready() -> bool: - """True when pending_cands should be flushed to eval_pool.""" - if not pending_cands: - return False - if len(pending_cands) >= self.eval_batch_size: - return True - # Flush early if no search tasks remain — prevents starvation - # when budget is nearly exhausted and no new searches will run. - return not any(t == "search" for t in futures.values()) + if self._zero_success_count >= self.restart_after_n: + if self.verbose: + print( + f"Restarting optimization: success_rate 0 for {self._zero_success_count} iterations." + ) - while (len(self.y_) < effective_max_iter) and ( - time.time() < timeout_start + self.max_time * 60 - ): - # 1. Flush pending candidates if the batch threshold is reached - # or if no search tasks are in flight (starvation guard). - if _batch_ready(): - _flush_batch() + status_message = "Restart triggered due to lack of improvement." - # 2. Fill open slots with search tasks (thread pool). - n_active = len(futures) - n_slots = self.n_jobs - n_active + # Expand results to full dimensions if needed + best_x_full = ( + self.to_all_dim(self.best_x_.reshape(1, -1))[0] + if self.red_dim + else self.best_x_ + ) + X_full = self.to_all_dim(self.X_) if self.red_dim else self.X_ - if n_slots > 0: - for _ in range(n_slots): - # Budget: committed evals + in-flight eval points - # + in-flight search tasks (each yields 1 cand) - # + candidates not yet dispatched. - n_in_flight = sum(_future_n_pts.values()) - n_searches = sum(1 for t in futures.values() if t == "search") - reserved = ( - len(self.y_) + n_in_flight + n_searches + len(pending_cands) - ) - if reserved < effective_max_iter: - # Search runs in a thread — no dill serialization; - # _thread_search_task holds _surrogate_lock. - fut = search_pool.submit(_thread_search_task) - futures[fut] = "search" - else: - break + # Map factor variables back to original strings + best_x_result = self.map_to_factor_values(best_x_full.reshape(1, -1))[0] + X_result = ( + self.map_to_factor_values(X_full) if self._factor_maps else X_full + ) - # 3. Flush again — new slot-filling may have pushed batch over - # the threshold (e.g. no budget left for more searches). - if _batch_ready(): - _flush_batch() + res = OptimizeResult( + x=best_x_result, + fun=self.best_y_, + nfev=len(self.y_), + nit=self.n_iter_, + success=False, + message=status_message, + X=X_result, + y=self.y_, + ) + return "RESTART", res - if not futures and not pending_cands: - break # Nothing left to do + # Update storage + self.update_storage(x_next_repeated, y_next) - if not futures: - # Pending candidates but no in-flight futures — flush now. - _flush_batch() - continue + # Update stats + self.update_stats() - # 4. Wait for any future to complete. - done, _ = wait(futures.keys(), return_when=FIRST_COMPLETED) - for fut in done: - ftype = futures.pop(fut) - try: - res = fut.result() - if isinstance(res, Exception): - if self.verbose: - print(f"Remote {ftype} failed: {res}") - # Failed search → slot freed; loop refills next iter. - # Failed batch_eval → points lost (budget not charged). - _future_n_pts.pop(fut, None) - continue + # Log to TensorBoard + if self.tb_writer is not None: + # Log each new evaluation + for i in range(len(y_next)): + self._write_tensorboard_hparams(x_next_repeated[i], y_next[i]) + self._write_tensorboard_scalars() - if ftype == "search": - x_cand = res - pending_cands.append(x_cand) - # Flush immediately if batch threshold reached — - # avoids an extra loop iteration for batch_size=1. - if _batch_ready(): - _flush_batch() + # Update best solution + self._update_best_main_loop( + x_next_repeated, y_next, start_time=timeout_start + ) - elif ftype == "batch_eval": - _future_n_pts.pop(fut, None) - X_done, y_done = res - if isinstance(y_done, Exception): - if self.verbose: - print(f"Batch eval failed: {y_done}") - else: - # Update storage for every point in the batch. - for xi, yi in zip(X_done, y_done): - self.update_success_rate(np.array([yi])) - self._update_storage_steady(xi, yi) - self.n_iter_ += 1 + # Expand results to full dimensions if needed + # Note: best_x_ and X_ are already in original scale (stored that way) + best_x_full = ( + self.to_all_dim(self.best_x_.reshape(1, -1))[0] + if self.red_dim + else self.best_x_ + ) + X_full = self.to_all_dim(self.X_) if self.red_dim else self.X_ - if self.verbose: - if self.max_time != np.inf: - prog_val = ( - (time.time() - timeout_start) - / (self.max_time * 60) - * 100 - ) - progress_str = f"Time: {prog_val:.1f}%" - else: - prog_val = ( - len(self.y_) / effective_max_iter * 100 - ) - progress_str = f"Evals: {prog_val:.1f}%" + # Determine termination reason + status_message = self.determine_termination(timeout_start) - print( - f"Iter {len(self.y_)}/{effective_max_iter}" - f" | Best: {self.best_y_:.6f}" - f" | Rate: {self.success_rate:.2f}" - f" | {progress_str}" - ) + # Append statistics to match scipy.optimize.minimize format + message = ( + f"{status_message}\n" + f" Current function value: {float(self.best_y_):.6f}\n" + f" Iterations: {self.n_iter_}\n" + f" Function evaluations: {len(self.y_)}" + ) - # Single surrogate refit for the whole batch — - # held under the lock so in-flight search threads - # do not read a partially-updated model. - with _surrogate_lock: - self._fit_scheduler() + # Close TensorBoard writer + self._close_tensorboard_writer() - except Exception as e: - _future_n_pts.pop(fut, None) - if self.verbose: - print(f"Error processing future: {e}") + # Map factor variables back to original strings for results + best_x_result = self.map_to_factor_values(best_x_full.reshape(1, -1))[0] + X_result = self.map_to_factor_values(X_full) if self._factor_maps else X_full + # Return scipy-style result return "FINISHED", OptimizeResult( - x=self.best_x_, + x=best_x_result, fun=self.best_y_, nfev=len(self.y_), nit=self.n_iter_, success=True, - message="Optimization finished (Steady State)", - X=self.X_, + message=message, + X=X_result, y=self.y_, ) - def suggest_next_infill_point(self) -> np.ndarray: - """Suggest next point to evaluate (dispatcher). - The returned point is in the Transformed and Mapped Space (Internal Optimization Space). - This means: - 1. Transformations (e.g., log, sqrt) have been applied. - 2. Dimension reduction has been applied (fixed variables removed). - Process: - 1. Try candidates from acquisition function optimizer. - 2. Handle acquisition failure (fallback). - 3. Return last attempt if all fails. + # ==================== + # TASK_OPTIM_PARALLEL: + # _update_storage_steady() + # optimize_steady_state() + # ==================== + + def _update_storage_steady(self, x, y): + """Helper to safely append single point (for steady state). + This method is designed for the steady-state parallel optimization scenario, where new points are evaluated and returned asynchronously. + It safely appends new points to the existing storage of evaluated points and their function values, + while also updating the current best solution if the new point is better. + Args: + x (ndarray): + New point(s) in original scale, shape (n_features,) or (N, n_features). + y (float or ndarray): + Corresponding function value(s). Returns: - ndarray: Next point(s) to evaluate in Transformed and Mapped Space. - Shape is (n_infill_points, n_features). + None. This method updates the internal state of the optimizer. + + Note: + - This method assumes that the caller handles any necessary synchronization if used in a parallel context + (e.g., using locks when updating shared state). + + Raises: + ValueError: If the input shapes are inconsistent or if y is not a scalar when x is a single point. Examples: - ```{python} - import numpy as np - from spotoptim import SpotOptim - def sphere(X): - X = np.atleast_2d(X) - return np.sum(X**2, axis=1) - opt = SpotOptim( - fun=sphere, - bounds=[(-5, 5), (-5, 5)], - n_initial=5, - n_infill_points=2 - ) - # Need to initialize optimization state (X_, y_, surrogate) - # Normally done inside optimize() - np.random.seed(0) - opt.X_ = np.random.rand(10, 2) - opt.y_ = np.random.rand(10) - opt._fit_surrogate(opt.X_, opt.y_) - x_next = opt.suggest_next_infill_point() - x_next.shape - ``` - """ - # 1. Optimizer candidates - candidates = [] - opt_candidates = self._try_optimizer_candidates( - n_needed=self.n_infill_points, current_batch=candidates - ) - candidates.extend(opt_candidates) - - if len(candidates) >= self.n_infill_points: - return np.vstack(candidates) - - # 2. Try fallback strategy to fill remaining slots - while len(candidates) < self.n_infill_points: - # Just try one attempt at a time but loop - # We pass current batch to avoid dups - cand, x_last = self._try_fallback_strategy( - max_attempts=10, current_batch=candidates - ) - if cand is not None: - candidates.append(cand) - else: - # Fallback failed to find unique point even after retries - # Break and fill with last attempts or just return what we have? - # If we return partial batch, we might fail downstream if code expects n points? - # Actually code should handle any number of points returned by this method? - # Or duplicate valid points? - # Warn and use duplicate if absolutely necessary? - if self.verbose: - print( - "Warning: Could not fill all infill points with unique candidates." - ) - break - - if len(candidates) > 0: - return np.vstack(candidates) - - # 3. Return last attempt (duplicate) if absolutely nothing found - # This returns a single point (1, d). - # Should we return n copies? - # If n_infill_points > 1, we should probably output (n, d) - - if self.verbose: - print( - "Warning: Could not find unique point after optimization candidates and fallback attempts. " - "Returning last candidate (duplicate)." - ) + >>> import numpy as np + >>> from spotoptim import SpotOptim + >>> opt = SpotOptim( + ... fun=lambda X: np.sum(X**2, axis=1), + ... bounds=[(-5, 5), (-5, 5)], + ... n_jobs=2 + ... ) + >>> opt._update_storage_steady(np.array([1.0, 2.0]), 5.0) + >>> print(opt.X_) + [[1. 2.]] + >>> print(opt.y_) + [5.] + >>> print(opt.best_x_) + [1. 2.] + >>> print(opt.best_y_) + 5.0 - # Verify x_last is not None - if x_last is None: - # Should practically not happen - x_next = self._handle_acquisition_failure() - return x_next.reshape(1, -1) + """ + x = np.atleast_2d(x) + if self.X_ is None: + self.X_ = x + self.y_ = np.array([y]) + else: + self.X_ = np.vstack([self.X_, x]) + self.y_ = np.append(self.y_, y) - # Return duplicated x_last to fill n_infill_points? OR just 1? - # Let's return 1 and let loop repeat it? - # But loop repeats based on x_next logic. - # If we return 1 point, it is treated as 1 point. - # If user asked for n_infill_points, maybe we should just return what we have (1 duplicated). + # Update best + if self.best_y_ is None or y < self.best_y_: + self.best_y_ = y + self.best_x_ = x.flatten() - return x_last.reshape(1, -1) + self.min_y = self.best_y_ + self.min_X = self.best_x_ - def _handle_NA_new_points( - self, x_next: np.ndarray, y_next: np.ndarray - ) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]: - """Handle NaN/inf values in new evaluation points. - Applies penalties to NaN/inf values and removes any remaining invalid points. - If all evaluations are invalid, returns None for both arrays to signal that - the iteration should be skipped. + def optimize_steady_state( + self, + timeout_start: float, + X0: Optional[np.ndarray], + y0_known: Optional[float] = None, + max_iter_override: Optional[int] = None, + ) -> Tuple[str, OptimizeResult]: + """Perform steady-state asynchronous optimization (n_jobs > 1). + This method implements a hybrid steady-state parallelization strategy. + The executor types are selected at runtime based on GIL availability: + Standard GIL build (Python ≤ 3.12 or GIL-enabled 3.13+): + * ``ProcessPoolExecutor`` (``eval_pool``) — objective function evaluations. + Process isolation ensures arbitrary callables (lambdas, closures) + serialized with ``dill`` run safely without touching shared state. + * ``ThreadPoolExecutor`` (``search_pool``) — surrogate search tasks. + Threads share the main-process heap; zero ``dill`` overhead. + A ``threading.Lock`` (``_surrogate_lock``) prevents a surrogate refit + from racing with an in-flight search thread. + Free-threaded build (``python3.13t`` / ``--disable-gil``): + * Both ``eval_pool`` and ``search_pool`` are ``ThreadPoolExecutor`` + instances. Threads achieve true CPU-level parallelism without the GIL. + The ``dill`` serialization step for eval tasks is eliminated — ``fun`` + is called directly from the shared heap. The ``_surrogate_lock`` is + still used to serialize surrogate reads and refits. + Pipeline: + 1. Parallel Initial Design: + ``n_initial`` points are dispatched to ``eval_pool``. Results are + collected via ``FIRST_COMPLETED`` until all initial evaluations finish. + 2. First Surrogate Fit: + Called on the main thread once all initial evaluations are in. + No lock is needed here because no search threads are active yet. + 3. Parallel Search (Thread Pool): + Up to ``n_jobs`` search tasks are submitted to ``search_pool``. + Each acquires ``_surrogate_lock`` before calling + ``suggest_next_infill_point()``, serializing concurrent surrogate reads. + 4. Steady-State Loop with Batch Dispatch: + - Search completes → candidate appended to ``pending_cands``. + - When ``len(pending_cands) >= eval_batch_size`` (or no search tasks + remain), all pending candidates are stacked into ``X_batch`` and + dispatched as a single eval call to ``eval_pool``. + On GIL builds this calls ``remote_batch_eval_wrapper`` (dill); + on free-threaded builds it calls ``fun`` directly in a thread. + - Batch eval completes → storage updated for every point, surrogate + refit once under ``_surrogate_lock``, new search slots filled. + - ``eval_batch_size=1`` (default) dispatches immediately on each + search completion, preserving the original one-point behavior. + - This cycle continues until ``max_iter`` evaluations or ``max_time`` + minutes is reached. + The optimization terminates when either: + - Total function evaluations reach ``max_iter`` (including initial design), OR + - Runtime exceeds ``max_time`` minutes Args: - x_next (ndarray): Design points that were evaluated, shape (n_eval, n_features). - y_next (ndarray): Function values at x_next, shape (n_eval,). + timeout_start (float): Start time for timeout. + X0 (Optional[np.ndarray]): Initial design points in Natural Space, shape (n_initial, n_features). + y0_known (Optional[float]): Known best objective value from a previous run. + When provided together with ``self.x0``, the matching point in the initial + design is pre-filled with this value and not re-submitted to the worker + pool, saving one evaluation per restart (restart injection). + max_iter_override (Optional[int]): Override for maximum number of iterations. + + Raises: + RuntimeError: If all initial design evaluations fail, likely due to + pickling issues or missing imports in the worker process. + The error message provides guidance on how to address this issue. Returns: - Tuple[Optional[ndarray], Optional[ndarray]]: Tuple of (x_clean, y_clean). - Both are None if all evaluations were NaN/inf (iteration should be skipped). - Otherwise returns filtered arrays with only finite values. + Tuple[str, OptimizeResult]: Tuple containing status and optimization result. Examples: - >>> import numpy as np - >>> from spotoptim import SpotOptim - >>> opt = SpotOptim( - ... fun=lambda X: np.sum(X**2, axis=1), - ... bounds=[(-5, 5), (-5, 5)], - ... n_initial=5, - ... verbose=True - ... ) - >>> # Simulate optimization state - >>> opt.y_ = np.array([1.0, 2.0, 3.0]) # Historical values - >>> opt.n_iter_ = 1 - >>> - >>> # Case 1: Some valid values - >>> x_next = np.array([[1, 2], [3, 4], [5, 6]]) - >>> y_next = np.array([5.0, np.nan, 10.0]) - >>> x_clean, y_clean = opt._handle_NA_new_points(x_next, y_next) - >>> x_clean.shape - (2, 2) - >>> y_clean.shape - (2,) - >>> - >>> # Case 2: All NaN/inf - should skip iteration - >>> x_all_bad = np.array([[1, 2], [3, 4]]) - >>> y_all_bad = np.array([np.nan, np.inf]) - >>> x_clean, y_clean = opt._handle_NA_new_points(x_all_bad, y_all_bad) - Warning: All new evaluations were NaN/inf, skipping iteration 1 - >>> x_clean is None - True - >>> y_clean is None - True + ```{python} + import time + from spotoptim import SpotOptim + from spotoptim.function import sphere + opt = SpotOptim( + fun=sphere, + bounds=[(-5, 5), (-5, 5)], + n_initial=5, + max_iter=10, + seed=0, + n_jobs=2, + ) + status, result = opt.optimize_steady_state(timeout_start=time.time(), X0=None) + print(status) + print(result.message.splitlines()[0]) + ``` """ - # Handle NaN/inf values in new evaluations - # Use historical y values (self.y_) for computing penalty statistics - if self.penalty: - y_next = self.apply_penalty_NA(y_next, y_history=self.y_) - - # Identify which points are valid (finite) BEFORE removing them - # Note: remove_nan filters based on y_next finite values + # Setup similar to _optimize_single_run + self.set_seed() + X0 = self.get_initial_design(X0) + X0 = self.curate_initial_design(X0) - # Ensure y_next is a float array (maps non-convertible values like "error" or None to nan) - # This is critical if the objective function returns non-numeric values and penalty=False - if y_next.dtype == object: - # Use safe float conversion similar to apply_penalty_NA - def _safe_float(v): - try: - return float(v) - except (ValueError, TypeError): - return np.nan + # Restart injection: if a known best value is provided, pre-fill the matching + # point in the initial design so it is not re-submitted to the worker pool. + # This mirrors the logic in _initialize_run() used by the sequential path. + y0_prefilled = np.full(len(X0), np.nan) + if y0_known is not None and self.x0 is not None: + dists = np.linalg.norm(X0 - self.x0, axis=1) + matches = dists < 1e-9 + if np.any(matches): + if self.verbose: + print("Skipping re-evaluation of injected best point.") + y0_prefilled[matches] = y0_known - # Reconstruct as float array - y_flat = np.array(y_next).flatten() - y_next = np.array([_safe_float(v) for v in y_flat]) + # We need to know how many evaluations to do + effective_max_iter = ( + max_iter_override if max_iter_override is not None else self.max_iter + ) - finite_mask = np.isfinite(y_next) + # Import dill locally (assuming installed) + import dill - X_next_clean, y_next_clean = self.remove_nan( - x_next, y_next, stop_on_zero_return=False + from contextlib import ExitStack + from concurrent.futures import ( + ProcessPoolExecutor, + ThreadPoolExecutor, + wait, + FIRST_COMPLETED, ) - # If we have multi-objective values, we need to filter them too - # The new MO values were appended to self.y_mo in evaluate_function -> mo2so -> store_mo - # So self.y_mo currently contains the INVALID points at the end. - if self.y_mo is not None: - n_new = len(y_next) - # Check if y_mo has the new points appended - if len(self.y_mo) >= n_new: - # The new points are at the end of y_mo - y_mo_new = self.y_mo[-n_new:] - y_mo_old = self.y_mo[:-n_new] + # Detect free-threaded (no-GIL) Python build (3.13+). + # On free-threaded builds ThreadPoolExecutor gives true CPU parallelism, + # so we can use threads for eval too and skip dill serialization entirely. + _no_gil = is_gil_disabled() - # Filter the new MO points using the mask from y_next - y_mo_new_clean = y_mo_new[finite_mask] + # Lock that serialises surrogate access: + # - search threads call suggest_next_infill_point() under the lock + # - the main thread calls _fit_scheduler() under the lock after each eval + # This prevents a surrogate refit from racing with an in-flight search. + _surrogate_lock = threading.Lock() - # Reconstruct y_mo - if len(y_mo_old) > 0: - self.y_mo = ( - np.vstack([y_mo_old, y_mo_new_clean]) - if len(y_mo_new_clean) > 0 - else y_mo_old - ) + def _thread_search_task(): + """Search task for ThreadPoolExecutor: direct call, no dill.""" + with _surrogate_lock: + return self.suggest_next_infill_point() + + def _thread_eval_task_single(x): + """Thread-based single-point eval for free-threaded Python (no dill).""" + try: + x_2d = x.reshape(1, -1) + y_arr = self.evaluate_function(x_2d) + return x, y_arr[0] + except Exception as e: + return None, e + + def _thread_batch_eval_task(X_batch): + """Thread-based batch eval for free-threaded Python (no dill).""" + try: + y_batch = self.evaluate_function(X_batch) + return X_batch, y_batch + except Exception as e: + return None, e + + # Build executor pair: + # GIL build → eval: ProcessPoolExecutor, search: ThreadPoolExecutor + # No-GIL build → eval: ThreadPoolExecutor, search: ThreadPoolExecutor + with ExitStack() as _stack: + eval_pool = _stack.enter_context( + ThreadPoolExecutor(max_workers=self.n_jobs) + if _no_gil + else ProcessPoolExecutor(max_workers=self.n_jobs) + ) + search_pool = _stack.enter_context( + ThreadPoolExecutor(max_workers=self.n_jobs) + ) + futures = {} # map future -> type ('eval', 'search') + + # --- Phase 1: Initial Design Evaluation --- + # Pre-filled points (restart injection) are stored on the main thread; + # all remaining points are submitted to eval_pool concurrently. + n_to_submit = 0 + for i, x in enumerate(X0): + if np.isfinite(y0_prefilled[i]): + # Known from restart injection — store directly, skip the pool. + self._update_storage_steady(x, y0_prefilled[i]) + continue + if _no_gil: + # Free-threaded: call fun directly in a thread — no dill. + fut = eval_pool.submit(_thread_eval_task_single, x) else: - self.y_mo = y_mo_new_clean - else: - if self.verbose: - print( - "Warning: y_mo size inconsistent with new points in _handle_NA_new_points" - ) + # GIL build: serialize with dill for process isolation. + _tb_writer_temp = self.tb_writer + self.tb_writer = None + try: + pickled_args = dill.dumps((self, x)) + finally: + self.tb_writer = _tb_writer_temp + fut = eval_pool.submit(remote_eval_wrapper, pickled_args) + futures[fut] = "eval" + n_to_submit += 1 - # Skip this iteration if all new points were NaN/inf - if len(y_next_clean) == 0: if self.verbose: + n_injected = int(np.sum(np.isfinite(y0_prefilled))) + suffix = ( + f" ({n_injected} injected from restart, skipped re-evaluation)." + if n_injected + else "." + ) print( - f"Warning: All new evaluations were NaN/inf, skipping iteration {self.n_iter_}" + f"Submitted {n_to_submit} initial points for parallel evaluation{suffix}" ) - return None, None - - return X_next_clean, y_next_clean - - def _update_best_main_loop( - self, - x_next_repeated: np.ndarray, - y_next: np.ndarray, - start_time: Optional[float] = None, - ) -> None: - """Update best solution found during main optimization loop. - Checks if any new evaluations improve upon the current best solution. - If improvement is found, updates best_x_ and best_y_ attributes and - prints progress if verbose mode is enabled. - - Args: - x_next_repeated (ndarray): Design points that were evaluated in transformed space, - shape (n_eval, n_features). - y_next (ndarray): Function values at x_next_repeated, shape (n_eval,). - Examples: - >>> import numpy as np - >>> from spotoptim import SpotOptim - >>> opt = SpotOptim( - ... fun=lambda X: np.sum(X**2, axis=1), - ... bounds=[(-5, 5), (-5, 5)], - ... n_initial=5, - ... verbose=True - ... ) - >>> # Simulate optimization state - >>> opt.n_iter_ = 1 - >>> opt.best_x_ = np.array([1.0, 1.0]) - >>> opt.best_y_ = 2.0 - >>> - >>> # Case 1: New best found - >>> x_new = np.array([[0.1, 0.1], [0.5, 0.5]]) - >>> y_new = np.array([0.02, 0.5]) - >>> opt._update_best_main_loop(x_new, y_new) - Iteration 1: New best f(x) = 0.020000 - >>> opt.best_y_ - 0.02 - >>> - >>> # Case 2: No improvement - >>> opt.n_iter_ = 2 - >>> x_no_improve = np.array([[1.5, 1.5]]) - >>> y_no_improve = np.array([4.5]) - >>> opt._update_best_main_loop(x_no_improve, y_no_improve) - Iteration 2: f(x) = 4.500000 - >>> - >>> # Case 3: With noisy function - >>> opt_noise = SpotOptim( - ... fun=lambda X: np.sum(X**2, axis=1), - ... bounds=[(-5, 5), (-5, 5)], - ... repeats_surrogate=2, - ... verbose=True - ... ) - >>> opt_noise.n_iter_ = 1 - >>> opt_noise.best_y_ = 2.0 - >>> opt_noise.min_mean_y = 1.5 - >>> y_noise = np.array([0.5]) - >>> x_noise = np.array([[0.5, 0.5]]) - >>> opt_noise._update_best_main_loop(x_noise, y_noise) - Iteration 1: New best f(x) = 0.500000, mean best: f(x) = 1.500000 - """ - # Update best - # Determine global best value for printing if shared variable exists - global_best_val = None - if hasattr(self, "shared_best_y") and self.shared_best_y is not None: - # Sync with global shared value - lock_obj = getattr(self, "shared_lock", None) - if lock_obj is not None: - with lock_obj: - if ( - self.best_y_ is not None - and self.best_y_ < self.shared_best_y.value - ): - self.shared_best_y.value = self.best_y_ + # Wait for all submitted initial evaluations to complete. + initial_done_count = 0 + while initial_done_count < n_to_submit: + done, _ = wait(futures.keys(), return_when=FIRST_COMPLETED) + for fut in done: + # Clean up + ftype = futures.pop(fut) + if ftype != "eval": + continue - min_y_next = np.min(y_next) - if min_y_next < self.shared_best_y.value: - self.shared_best_y.value = min_y_next + try: + x_done, y_done = fut.result() + if isinstance(y_done, Exception): + if self.verbose: + print(f"Eval failed: {y_done}") + else: + self._update_storage_steady(x_done, y_done) + except Exception as e: + if self.verbose: + print(f"Task failed: {e}") - global_best_val = self.shared_best_y.value + initial_done_count += 1 - current_best = np.min(y_next) - if current_best < self.best_y_: - best_idx_in_new = np.argmin(y_next) - # x_next_repeated is in transformed space, convert to original for storage - self.best_x_ = self.inverse_transform_X( - x_next_repeated[best_idx_in_new].reshape(1, -1) - )[0] - self.best_y_ = current_best + # Init tensorboard and stats + self._init_tensorboard() + + if self.y_ is None or len(self.y_) == 0: + raise RuntimeError( + "All initial design evaluations failed. " + "Check your objective function for pickling issues or missing imports (e.g. numpy) in the worker process. " + "If defining functions in a notebook/script, ensure imports are inside the function." + ) + + self.update_stats() + self.get_best_xy_initial_design() + # Fit first surrogate (no lock needed — no search threads active yet) if self.verbose: - # Calculate progress - if self.max_time != np.inf and start_time is not None: - progress = (time.time() - start_time) / (self.max_time * 60) * 100 - progress_str = f"Time: {progress:.1f}%" + print( + f"Initial design evaluated. Fitting surrogate... (Data size: {len(self.y_)})" + ) + self._fit_scheduler() + + # --- Phase 2: Steady State Loop --- + if self.verbose: + print("Starting steady-state optimization loop...") + + # Candidates returned by search tasks, waiting to form the next batch. + pending_cands: list = [] + + # Maps each in-flight batch_eval future → number of points it carries. + # Used for accurate budget accounting: reserved = committed + in_flight + # + pending, so we never over-dispatch search tasks. + _future_n_pts: dict = {} + + def _flush_batch() -> None: + """Dispatch all pending_cands as a single batch eval task.""" + nonlocal pending_cands + X_batch = np.vstack(pending_cands) + n_in_batch = len(pending_cands) + pending_cands = [] + if _no_gil: + # Free-threaded: call fun directly in a thread — no dill. + fut_eval = eval_pool.submit(_thread_batch_eval_task, X_batch) else: - prev_evals = sum(res.nfev for res in self.restarts_results_) - progress = (prev_evals + self.counter) / self.max_iter * 100 - progress_str = f"Evals: {progress:.1f}%" + # GIL build: serialize with dill for process isolation. + _tb_writer_temp = self.tb_writer + self.tb_writer = None + try: + pickled_args = dill.dumps((self, X_batch)) + finally: + self.tb_writer = _tb_writer_temp + fut_eval = eval_pool.submit(remote_batch_eval_wrapper, pickled_args) + futures[fut_eval] = "batch_eval" + _future_n_pts[fut_eval] = n_in_batch - msg = f"Iter {self.n_iter_}" - if global_best_val is not None: - msg += f" | GlobalBest: {global_best_val:.6f}" - msg += f" | Best: {self.best_y_:.6f} | Rate: {self.success_rate:.2f} | {progress_str}" + def _batch_ready() -> bool: + """True when pending_cands should be flushed to eval_pool.""" + if not pending_cands: + return False + if len(pending_cands) >= self.eval_batch_size: + return True + # Flush early if no search tasks remain — prevents starvation + # when budget is nearly exhausted and no new searches will run. + return not any(t == "search" for t in futures.values()) - if (self.repeats_initial > 1) or (self.repeats_surrogate > 1): - msg += f" | Mean Best: {self.min_mean_y:.6f}" + while (len(self.y_) < effective_max_iter) and ( + time.time() < timeout_start + self.max_time * 60 + ): + # 1. Flush pending candidates if the batch threshold is reached + # or if no search tasks are in flight (starvation guard). + if _batch_ready(): + _flush_batch() - print(msg) - elif self.verbose: - if self.max_time != np.inf and start_time is not None: - progress = (time.time() - start_time) / (self.max_time * 60) * 100 - progress_str = f"Time: {progress:.1f}%" - else: - prev_evals = sum(res.nfev for res in self.restarts_results_) - progress = (prev_evals + self.counter) / self.max_iter * 100 - progress_str = f"Evals: {progress:.1f}%" + # 2. Fill open slots with search tasks (thread pool). + n_active = len(futures) + n_slots = self.n_jobs - n_active - current_val = np.min(y_next) - msg = f"Iter {self.n_iter_}" - if global_best_val is not None: - msg += f" | GlobalBest: {global_best_val:.6f}" - msg += f" | Best: {self.best_y_:.6f} | Curr: {current_val:.6f} | Rate: {self.success_rate:.2f} | {progress_str}" + if n_slots > 0: + for _ in range(n_slots): + # Budget: committed evals + in-flight eval points + # + in-flight search tasks (each yields 1 cand) + # + candidates not yet dispatched. + n_in_flight = sum(_future_n_pts.values()) + n_searches = sum(1 for t in futures.values() if t == "search") + reserved = ( + len(self.y_) + n_in_flight + n_searches + len(pending_cands) + ) + if reserved < effective_max_iter: + # Search runs in a thread — no dill serialization; + # _thread_search_task holds _surrogate_lock. + fut = search_pool.submit(_thread_search_task) + futures[fut] = "search" + else: + break - if (self.repeats_initial > 1) or (self.repeats_surrogate > 1): - mean_y_new = np.mean(y_next) - msg += f" | Mean Curr: {mean_y_new:.6f}" - print(msg) + # 3. Flush again — new slot-filling may have pushed batch over + # the threshold (e.g. no budget left for more searches). + if _batch_ready(): + _flush_batch() - def determine_termination(self, timeout_start: float) -> str: - """Determine termination reason for optimization. - Checks the termination conditions and returns an appropriate message - indicating why the optimization stopped. Three possible termination - conditions are checked in order of priority: - 1. Maximum number of evaluations reached - 2. Maximum time limit exceeded - 3. Successful completion (neither limit reached) + if not futures and not pending_cands: + break # Nothing left to do - Args: - timeout_start (float): Start time of optimization (from time.time()). + if not futures: + # Pending candidates but no in-flight futures — flush now. + _flush_batch() + continue - Returns: - str: Message describing the termination reason. + # 4. Wait for any future to complete. + done, _ = wait(futures.keys(), return_when=FIRST_COMPLETED) + for fut in done: + ftype = futures.pop(fut) + try: + res = fut.result() + if isinstance(res, Exception): + if self.verbose: + print(f"Remote {ftype} failed: {res}") + # Failed search → slot freed; loop refills next iter. + # Failed batch_eval → points lost (budget not charged). + _future_n_pts.pop(fut, None) + continue - Examples: - ```{python} - import numpy as np - import time - from spotoptim import SpotOptim - opt = SpotOptim( - fun=lambda X: np.sum(X**2, axis=1), - bounds=[(-5, 5), (-5, 5)], - max_iter=10, - max_time=10.0 - ) - # Case 1: Maximum evaluations reached - opt.y_ = np.zeros(20) # Simulate 20 evaluations - start_time = time.time() - msg = opt.determine_termination(start_time) - print(msg) - ``` - ```{python} - # Case 2: Time limit exceeded - import numpy as np - import time - from spotoptim import SpotOptim - opt.y_ = np.zeros(10) # Only 10 evaluations - start_time = time.time() - 700 # Simulate 11.67 minutes elapsed - msg = opt.determine_termination(start_time) - print(msg) - ``` - ```{python} - # Case 3: Successful completion - import numpy as np - import time - from spotoptim import SpotOptim - opt.y_ = np.zeros(10) # Under max_iter - start_time = time.time() # Just started - msg = opt.determine_termination(start_time) - print(msg) - ``` - """ - # Determine termination reason - elapsed_time = time.time() - timeout_start - if len(self.y_) >= self.max_iter: - message = f"Optimization terminated: maximum evaluations ({self.max_iter}) reached" - elif elapsed_time >= self.max_time * 60: - message = ( - f"Optimization terminated: time limit ({self.max_time:.2f} min) reached" - ) - else: - message = "Optimization finished successfully" + if ftype == "search": + x_cand = res + pending_cands.append(x_cand) + # Flush immediately if batch threshold reached — + # avoids an extra loop iteration for batch_size=1. + if _batch_ready(): + _flush_batch() + + elif ftype == "batch_eval": + _future_n_pts.pop(fut, None) + X_done, y_done = res + if isinstance(y_done, Exception): + if self.verbose: + print(f"Batch eval failed: {y_done}") + else: + # Update storage for every point in the batch. + for xi, yi in zip(X_done, y_done): + self.update_success_rate(np.array([yi])) + self._update_storage_steady(xi, yi) + self.n_iter_ += 1 + + if self.verbose: + if self.max_time != np.inf: + prog_val = ( + (time.time() - timeout_start) + / (self.max_time * 60) + * 100 + ) + progress_str = f"Time: {prog_val:.1f}%" + else: + prog_val = ( + len(self.y_) / effective_max_iter * 100 + ) + progress_str = f"Evals: {prog_val:.1f}%" - return message + print( + f"Iter {len(self.y_)}/{effective_max_iter}" + f" | Best: {self.best_y_:.6f}" + f" | Rate: {self.success_rate:.2f}" + f" | {progress_str}" + ) - def apply_ocba(self) -> Optional[np.ndarray]: - """Apply Optimal Computing Budget Allocation for noisy functions. - Determines which existing design points should be re-evaluated based on - OCBA algorithm. This method computes optimal budget allocation to improve - the quality of the estimated best design. + # Single surrogate refit for the whole batch — + # held under the lock so in-flight search threads + # do not read a partially-updated model. + with _surrogate_lock: + self._fit_scheduler() - Returns: - Optional[ndarray]: - Array of design points to re-evaluate, shape (n_re_eval, n_features). - Returns None if OCBA conditions are not met or OCBA is disabled. + except Exception as e: + _future_n_pts.pop(fut, None) + if self.verbose: + print(f"Error processing future: {e}") - Note: - OCBA is only applied when: + return "FINISHED", OptimizeResult( + x=self.best_x_, + fun=self.best_y_, + nfev=len(self.y_), + nit=self.n_iter_, + success=True, + message="Optimization finished (Steady State)", + X=self.X_, + y=self.y_, + ) - * (self.repeats_initial > 1) or (self.repeats_surrogate > 1) - * self.ocba_delta > 0 - * All variances are > 0 - * At least 3 design points exist - Examples: - ```{python} - import numpy as np - from spotoptim import SpotOptim - opt = SpotOptim( - fun=lambda X: np.sum(X**2, axis=1) + np.random.normal(0, 0.1, X.shape[0]), - bounds=[(-5, 5), (-5, 5)], - n_initial=5, - repeats_surrogate=2, - ocba_delta=5, - verbose=True - ) - # Simulate optimization state (normally done in optimize()) - opt.mean_X = np.array([[1, 2], [0, 0], [2, 1]]) - opt.mean_y = np.array([5.0, 0.1, 5.0]) - opt.var_y = np.array([0.1, 0.05, 0.15]) - X_ocba = opt.apply_ocba() - # OCBA: Adding 5 re-evaluation(s). - # The following should be true: - print(X_ocba.shape[0] == 5) - ``` + # ==================== + # TASK_STATS: + # init_storage() + # update_storage() + # update_stats() + # update_success_rate() + # get_success_rate() + # aggregate_mean_var() + # get_best_hyperparameters + # ==================== - OCBA skipped - insufficient points - ```{python} - import numpy as np - from spotoptim import SpotOptim - opt2 = SpotOptim( - fun=lambda X: np.sum(X**2, axis=1), - bounds=[(-5, 5), (-5, 5)], - repeats_surrogate=2, - ocba_delta=5, - verbose=True - ) - opt2.mean_X = np.array([[1, 2], [0, 0]]) - opt2.mean_y = np.array([5.0, 0.1]) - opt2.var_y = np.array([0.1, 0.05]) - X_ocba = opt2.apply_ocba() - # Warning: OCBA skipped (need >2 points with variance > 0) - print(X_ocba is None) - ``` + def get_best_hyperparameters( + self, as_dict: bool = True + ) -> Union[Dict[str, Any], np.ndarray, None]: """ - # OCBA: Compute optimal budget allocation for noisy functions - # This determines which existing design points should be re-evaluated - X_ocba = None - if ( - self.repeats_initial > 1 or self.repeats_surrogate > 1 - ) and self.ocba_delta > 0: - # Check conditions for OCBA (need variance > 0 and at least 3 points) - if not np.all(self.var_y > 0) and (self.mean_X.shape[0] <= 2): - if self.verbose: - print("Warning: OCBA skipped (need >2 points with variance > 0)") - elif np.all(self.var_y > 0) and (self.mean_X.shape[0] > 2): - # Get OCBA allocation - X_ocba = self.get_ocba_X( - self.mean_X, - self.mean_y, - self.var_y, - self.ocba_delta, - verbose=self.verbose, - ) - if self.verbose and X_ocba is not None: - print(f" OCBA: Adding {X_ocba.shape[0]} re-evaluation(s)") - - return X_ocba - - def apply_penalty_NA( - self, - y: np.ndarray, - y_history: Optional[np.ndarray] = None, - penalty_value: Optional[float] = None, - sd: float = 0.1, - ) -> np.ndarray: - """Replace NaN and infinite values with penalty plus random noise. - Used in the optimize() method after function evaluations. - This method follows the approach from spotpython.utils.repair.apply_penalty_NA, - replacing NaN/inf values with a penalty value plus random noise to avoid - identical penalty values. + Get the best hyperparameter configuration found during optimization. + If noise handling is active (repeats_initial > 1 or OCBA), this returns the parameter + configuration associated with the best *mean* objective value. Otherwise, it returns + the configuration associated with the absolute best observed value. Args: - y (ndarray): Array of objective function values to be repaired. - y_history (ndarray, optional): Historical objective function values used for - computing penalty statistics. If None, uses y itself. Default is None. - penalty_value (float, optional): Value to replace NaN/inf with. - If None, computes penalty as: max(finite_y_history) + 3 * std(finite_y_history). - If all values are NaN/inf or only one finite value exists, falls back - to self.penalty_val. Default is None. - sd (float): Standard deviation for normal distributed random noise added to penalty. - Default is 0.1. + as_dict (bool, optional): If True, returns a dictionary mapping parameter names + to their values. If False, returns the raw numpy array. Defaults to True. Returns: - ndarray: - Array with NaN/inf replaced by penalty_value + random noise - (normal distributed with mean 0 and standard deviation sd). + Union[Dict[str, Any], np.ndarray, None]: The best hyperparameter configuration. + Returns None if optimization hasn't started (no data). Examples: ```{python} - import numpy as np from spotoptim import SpotOptim - opt = SpotOptim(fun=lambda X: np.sum(X**2, axis=1), bounds=[(-5, 5)]) - y_hist = np.array([1.0, 2.0, 3.0, 5.0]) - y_new = np.array([4.0, np.nan, np.inf]) - y_clean = opt.apply_penalty_NA(y_new, y_history=y_hist) - print(f"np.all(np.isfinite(y_clean)): {np.all(np.isfinite(y_clean))}") - print(f"y_clean: {y_clean}") - # NaN/inf replaced with worst value from history + 3*std + noise - print(f"y_clean[1] > 5.0: {y_clean[1] > 5.0}") # Should be larger than max finite value in history + from spotoptim.function import sphere + + opt = SpotOptim(fun=sphere, + bounds=[(-5, 5), (0, 10)], + n_initial=5, + var_name=["x", "y"], + verbose=True) + opt.optimize() + best_params = opt.get_best_hyperparameters() + print(best_params['x']) # Should be close to 0 ``` """ + if self.X_ is None or len(self.X_) == 0: + return None - # Ensure y is a float array (maps non-convertible values like "error" or None to nan) - def _safe_float(v): - try: - return float(v) - except (ValueError, TypeError): - return np.nan - - y_flat = np.array(y).flatten() - y = np.array([_safe_float(v) for v in y_flat]) - # Identify NaN and inf values in y - mask = ~np.isfinite(y) - - if np.any(mask): - n_bad = np.sum(mask) + # Determine which "best" to use + if (self.repeats_initial > 1 or self.repeats_surrogate > 1) and hasattr( + self, "min_mean_X" + ): + best_x = self.min_mean_X + else: + best_x = self.best_x_ - # Compute penalty_value if not provided - if penalty_value is None: - # Get finite values from history for statistics - # Use y_history if provided, otherwise fall back to y itself - if y_history is not None: - finite_values = y_history[np.isfinite(y_history)] - else: - # Use current y values - finite_values = y[~mask] + if not as_dict: + return best_x - # If we have at least 2 finite values, compute adaptive penalty - if len(finite_values) >= 2: - max_y = np.max(finite_values) - std_y = np.std(finite_values, ddof=1) - penalty_value = max_y + 3.0 * std_y + # Map factors using existing method (handles 2D, returns 2D) + # We pass best_x as (1, D) and get (1, D) back + mapped_x = self.map_to_factor_values(best_x.reshape(1, -1))[0] - if self.verbose: - print( - f"Warning: Found {n_bad} NaN/inf value(s), replacing with " - f"adaptive penalty (max + 3*std = {penalty_value:.4f})" - ) - else: - # Fallback to self.penalty if insufficient finite values - if self.penalty_val is not None: - penalty_value = self.penalty_val - elif len(finite_values) == 1: - # Use the single finite value + a large constant - penalty_value = finite_values[0] + 1000.0 - else: - # All values are NaN/inf, use a large default - penalty_value = 1e10 + # Convert to dictionary with types + params = {} + names = ( + self.var_name if self.var_name else [f"p{i}" for i in range(len(best_x))] + ) - if self.verbose: - print( - f"Warning: Found {n_bad} NaN/inf value(s), insufficient finite values " - f"for adaptive penalty. Using penalty_value = {penalty_value}" - ) - else: - if self.verbose: - print( - f"Warning: Found {n_bad} NaN/inf value(s), replacing with {penalty_value} + noise" - ) + for i, name in enumerate(names): + val = mapped_x[i] - # Generate random noise and add to penalty - random_noise = self.rng.normal(0, sd, y.shape) - penalty_values = penalty_value + random_noise + # Handle types if available (specifically int, as factors are already mapped) + if self.var_type: + v_type = self.var_type[i] + if v_type == "int": + val = int(round(val)) - # Replace NaN/inf with penalty + noise - y[mask] = penalty_values[mask] + params[name] = val - return y + return params - # ==================== - # Storage & Statistics - # ==================== def init_storage(self, X0: np.ndarray, y0: np.ndarray) -> None: """Initialize storage for optimization. @@ -6020,7 +6113,6 @@ def aggregate_mean_var( self, X: np.ndarray, y: np.ndarray ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Aggregate X and y values to compute mean and variance per group. - For repeated evaluations at the same design point, this method computes the mean function value and variance (using population variance, ddof=0). @@ -6081,7 +6173,21 @@ def aggregate_mean_var( return X_agg, y_mean, y_var # ==================== - # Results & Analysis + # TASK_RESULTS: + # save_result() + # load_result() + # save_experiment() + # load_experiment() + # get_result_filename() + # get_experiment_filename() + # print_results() + # print_best() + # get_results_table() + # get_design_table() + # gen_design_table() + # get_importance() + # sensitivity_spearman() + # get_stars() # ==================== def save_result( @@ -6167,7 +6273,6 @@ def sphere(X): @staticmethod def load_result(filename: str) -> "SpotOptim": """Load complete optimization results from a pickle file (suffix '_res.pkl') - Loads results that were saved with save_result(). The loaded optimizer will have both configuration and all optimization results. @@ -7117,7 +7222,14 @@ def test_func(X): return output_list # ==================== - # TensorBoard Integration + # TASK_TENSORBOARD: + # _clen_tensorboard_logs() + # _init_tensorboard_writer() + # _write_tensorboard_scalars() + # _write_tensorboard_hparams() + # _close_tensorboard_writer() + # init_tensorboard() + # _close_and_del_tensorboard_writer() # ==================== def _clean_tensorboard_logs(self) -> None: @@ -7352,7 +7464,13 @@ def _close_and_del_tensorboard_writer(self) -> None: self.tb_writer = None # ==================== - # Plotting + # TASK_PLOT: + # plot_progress() + # plot_surrogate() + # plot_important_hyperparameter_contour() + # _plot_surrogate_with_factors() + # plot_importance() + # plot_parameter_scatter() # ==================== def plot_progress( diff --git a/tests/test_optimize_refactored_methods.py b/tests/test_optimize_refactored_methods.py index 60d1850..fe6b7c0 100644 --- a/tests/test_optimize_refactored_methods.py +++ b/tests/test_optimize_refactored_methods.py @@ -156,7 +156,7 @@ def test_handle_na_removes_nan_values(self): X0 = np.array([[1, 2], [3, 4], [5, 6]]) y0 = np.array([5.0, np.nan, 61.0]) - X0_clean, y0_clean, n_eval = opt.rm_NA_values(X0, y0) + X0_clean, y0_clean, n_eval = opt.rm_initial_design_NA_values(X0, y0) assert X0_clean.shape == (2, 2) assert len(y0_clean) == 2 @@ -174,7 +174,7 @@ def test_handle_na_removes_inf_values(self): X0 = np.array([[1, 2], [3, 4], [5, 6]]) y0 = np.array([5.0, np.inf, 61.0]) - X0_clean, y0_clean, n_eval = opt.rm_NA_values(X0, y0) + X0_clean, y0_clean, n_eval = opt.rm_initial_design_NA_values(X0, y0) assert X0_clean.shape == (2, 2) assert len(y0_clean) == 2 @@ -191,7 +191,7 @@ def test_handle_na_all_valid_values(self): X0 = np.array([[1, 2], [3, 4], [5, 6]]) y0 = np.array([5.0, 25.0, 61.0]) - X0_clean, y0_clean, n_eval = opt.rm_NA_values(X0, y0) + X0_clean, y0_clean, n_eval = opt.rm_initial_design_NA_values(X0, y0) assert X0_clean.shape == (3, 2) assert len(y0_clean) == 3 diff --git a/tests/test_run_sequential_loop_example.py b/tests/test_run_sequential_loop_example.py index f1ea90f..4cfb6b4 100644 --- a/tests/test_run_sequential_loop_example.py +++ b/tests/test_run_sequential_loop_example.py @@ -21,7 +21,7 @@ def test_run_sequential_loop_example(): ) X0, y0 = opt._initialize_run(X0=None, y0_known=None) - X0, y0, n_evaluated = opt.rm_NA_values(X0, y0) + X0, y0, n_evaluated = opt.rm_initial_design_NA_values(X0, y0) opt.check_size_initial_design(y0, n_evaluated) opt.init_storage(X0, y0) opt._zero_success_count = 0