diff --git a/.github/workflows/release-preflight.yml b/.github/workflows/release-preflight.yml index 7e905e4..6d11cea 100644 --- a/.github/workflows/release-preflight.yml +++ b/.github/workflows/release-preflight.yml @@ -49,6 +49,7 @@ jobs: - name: Preflight token and push checks env: RELEASE_TOKEN: ${{ secrets.SEMANTIC_RELEASE_TOKEN || github.token }} + GH_TOKEN: ${{ secrets.SEMANTIC_RELEASE_TOKEN || github.token }} run: | set -euo pipefail @@ -60,14 +61,13 @@ jobs: echo "Checking authenticated read access to ${GITHUB_REPOSITORY}..." git ls-remote --exit-code origin HEAD >/dev/null - echo "Checking dry-run push permission to main branch..." - git push --dry-run origin HEAD:refs/heads/main >/dev/null - - temp_tag="preflight-sr-${GITHUB_RUN_ID}-${GITHUB_RUN_ATTEMPT}" - git tag "${temp_tag}" HEAD - trap 'git tag -d "${temp_tag}" >/dev/null 2>&1 || true' EXIT + echo "Checking push permission via GitHub API..." + push_perm=$(gh api "repos/${GITHUB_REPOSITORY}" --jq '.permissions.push' 2>/dev/null || echo "false") + if [ "${push_perm}" != "true" ]; then + echo "::error::Token does not have push permission to ${GITHUB_REPOSITORY}. Set SEMANTIC_RELEASE_TOKEN to a PAT with repo write access." + exit 1 + fi - echo "Checking dry-run push permission for tags..." - git push --dry-run origin "refs/tags/${temp_tag}" >/dev/null + echo "Checking tag push permission (same as push)... already verified above." echo "Preflight passed: token and push permissions are sufficient for semantic-release." diff --git a/src/spotoptim/SpotOptim.py b/src/spotoptim/SpotOptim.py index 7eb12e6..48d9e7f 100644 --- a/src/spotoptim/SpotOptim.py +++ b/src/spotoptim/SpotOptim.py @@ -1006,10 +1006,6 @@ def __dir__(self): return list(d) - # ==================== - # Configuration & Helpers - # ==================== - def set_seed(self) -> None: """Set global random seeds for reproducibility. Sets seeds for: @@ -1039,7 +1035,12 @@ def set_seed(self) -> None: torch.cuda.manual_seed_all(self.seed) # ==================== - # var_type and bounds related methods + # TASK_VARS: + # detect_var_type + # modify_bounds_based_on_var_type + # repair_non_numeric + # handle_default_var_trans + # process_factor_bounds # ==================== def detect_var_type(self) -> list: @@ -1265,80 +1266,9 @@ def process_factor_bounds(self) -> None: self.bounds = processed_bounds # ==================== - # Statistics and Results: best hyperparameters - # ==================== - - def get_best_hyperparameters( - self, as_dict: bool = True - ) -> Union[Dict[str, Any], np.ndarray, None]: - """ - Get the best hyperparameter configuration found during optimization. - If noise handling is active (repeats_initial > 1 or OCBA), this returns the parameter - configuration associated with the best *mean* objective value. Otherwise, it returns - the configuration associated with the absolute best observed value. - - Args: - as_dict (bool, optional): If True, returns a dictionary mapping parameter names - to their values. If False, returns the raw numpy array. Defaults to True. - - Returns: - Union[Dict[str, Any], np.ndarray, None]: The best hyperparameter configuration. - Returns None if optimization hasn't started (no data). - - Examples: - ```{python} - from spotoptim import SpotOptim - from spotoptim.function import sphere - - opt = SpotOptim(fun=sphere, - bounds=[(-5, 5), (0, 10)], - n_initial=5, - var_name=["x", "y"], - verbose=True) - opt.optimize() - best_params = opt.get_best_hyperparameters() - print(best_params['x']) # Should be close to 0 - ``` - """ - if self.X_ is None or len(self.X_) == 0: - return None - - # Determine which "best" to use - if (self.repeats_initial > 1 or self.repeats_surrogate > 1) and hasattr( - self, "min_mean_X" - ): - best_x = self.min_mean_X - else: - best_x = self.best_x_ - - if not as_dict: - return best_x - - # Map factors using existing method (handles 2D, returns 2D) - # We pass best_x as (1, D) and get (1, D) back - mapped_x = self.map_to_factor_values(best_x.reshape(1, -1))[0] - - # Convert to dictionary with types - params = {} - names = ( - self.var_name if self.var_name else [f"p{i}" for i in range(len(best_x))] - ) - - for i, name in enumerate(names): - val = mapped_x[i] - - # Handle types if available (specifically int, as factors are already mapped) - if self.var_type: - v_type = self.var_type[i] - if v_type == "int": - val = int(round(val)) - - params[name] = val - - return params - - # ==================== - # Save, Load, and Reinitialization of Components (Pickling) + # TASK_SAVE_LOAD: + # get_pickle_safe_optimizer + # reinitialize_components # ==================== def get_pickle_safe_optimizer( @@ -1446,153 +1376,12 @@ def reinitialize_components(self) -> None: if not hasattr(self, "surrogate") or self.surrogate is None: self.init_surrogate() - # ==================== - # Surrogate - # ==================== - - def init_surrogate(self) -> None: - """Initialize or configure the surrogate model for optimization. Handles three surrogate configurations: - * List of surrogates: sets up multi-surrogate selection with probability weights and per-surrogate `max_surrogate_points`. - * None (default): creates a `GaussianProcessRegressor` with a - `ConstantKernel * Matern(nu=2.5)` kernel, 100 optimizer restarts, - and `normalize_y=True`. - * User-provided surrogate: accepted as-is; internal bookkeeping - attributes (`_max_surrogate_points_list`, - `_active_max_surrogate_points`) are still initialised. - After this method returns the following attributes are set: - * `self.surrogate` — the active surrogate model. - * `self._surrogates_list` — `list | None`. - * `self._prob_surrogate` — normalised selection probabilities or `None`. - * `self._max_surrogate_points_list` — per-surrogate point caps or `None`. - * `self._active_max_surrogate_points` — active cap. - - Raises: - ValueError: If the surrogate list is empty. - ValueError: If 'prob_surrogate' length does not match the surrogate list length. - ValueError: If 'max_surrogate_points' list length does not match the surrogate list length. - - Returns: - None - - Examples: - ```{python} - import numpy as np - from spotoptim import SpotOptim - # Default surrogate (GaussianProcessRegressor) - opt = SpotOptim( - fun=lambda X: np.sum(X**2, axis=1), - bounds=[(-5, 5), (-5, 5)], - n_initial=5, - ) - print(type(opt.surrogate).__name__) - ``` - - ```{python} - import numpy as np - from spotoptim import SpotOptim - from sklearn.ensemble import RandomForestRegressor - # User-provided surrogate - rf = RandomForestRegressor(n_estimators=50, random_state=42) - opt = SpotOptim( - fun=lambda X: np.sum(X**2, axis=1), - bounds=[(-5, 5), (-5, 5)], - n_initial=5, - surrogate=rf, - ) - print(type(opt.surrogate).__name__) - ``` - - ```{python} - import numpy as np - from spotoptim import SpotOptim - from sklearn.ensemble import RandomForestRegressor - from sklearn.gaussian_process import GaussianProcessRegressor - # List of surrogates with selection probabilities - surrogates = [GaussianProcessRegressor(), RandomForestRegressor()] - opt = SpotOptim( - fun=lambda X: np.sum(X**2, axis=1), - bounds=[(-5, 5), (-5, 5)], - n_initial=5, - surrogate=surrogates, - prob_surrogate=[0.7, 0.3], - ) - print(opt._prob_surrogate) - print([type(s).__name__ for s in opt._surrogates_list]) - ``` - """ - self._surrogates_list = None - self._prob_surrogate = None - - if isinstance(self.surrogate, list): - self._surrogates_list = self.surrogate - if not self._surrogates_list: - raise ValueError("Surrogate list cannot be empty.") - - # Handle probabilities - if self.config.prob_surrogate is None: - # Uniform probability - n = len(self._surrogates_list) - self._prob_surrogate = [1.0 / n] * n - else: - probs = self.config.prob_surrogate - if len(probs) != len(self._surrogates_list): - raise ValueError( - f"Length of prob_surrogate ({len(probs)}) must match " - f"number of surrogates ({len(self._surrogates_list)})." - ) - # Normalize probabilities - total = sum(probs) - if not np.isclose(total, 1.0) and total > 0: - self._prob_surrogate = [p / total for p in probs] - else: - self._prob_surrogate = probs - - # Handle max_surrogate_points list - self._max_surrogate_points_list = None - if isinstance(self.config.max_surrogate_points, list): - if len(self.config.max_surrogate_points) != len(self._surrogates_list): - raise ValueError( - f"Length of max_surrogate_points ({len(self.config.max_surrogate_points)}) " - f"must match number of surrogates ({len(self._surrogates_list)})." - ) - self._max_surrogate_points_list = self.config.max_surrogate_points - else: - # If int or None, broadcast to list for easier indexing - self._max_surrogate_points_list = [ - self.config.max_surrogate_points - ] * len(self._surrogates_list) - - # Set initial surrogate and max points - self.surrogate = self._surrogates_list[0] - self._active_max_surrogate_points = self._max_surrogate_points_list[0] - - elif self.surrogate is None: - # Default single surrogate case - self._max_surrogate_points_list = None - self._active_max_surrogate_points = self.config.max_surrogate_points - - kernel = ConstantKernel(1.0, (1e-2, 1e12)) * Matern( - length_scale=1.0, length_scale_bounds=(1e-4, 1e2), nu=2.5 - ) - - # Determine optimizer for GPR - optimizer = "fmin_l_bfgs_b" # Default used by sklearn - if self.config.acquisition_optimizer_kwargs is not None: - optimizer = partial( - gpr_minimize_wrapper, **self.config.acquisition_optimizer_kwargs - ) - - self.surrogate = GaussianProcessRegressor( - kernel=kernel, - n_restarts_optimizer=100, - normalize_y=True, - random_state=self.seed, - optimizer=optimizer, - ) - # ==================== - # Dimension Reduction + # TASK_DIM: + # setup_dimension_reduction() + # to_red_dim() + # to_all_dim() # ==================== def setup_dimension_reduction(self) -> None: @@ -1780,7 +1569,13 @@ def sphere(X): return X_full # ==================== - # Variable Transformation and Mapping + # TASK_TRANSFORM: + # transform_value() + # inverse_transform_value() + # transform_X() + # inverse_transform_X() + # transform_bounds() + # map_to_factor_values() # ==================== def transform_value(self, x: float, trans: Optional[str]) -> float: @@ -2113,7 +1908,14 @@ def map_to_factor_values(self, X: np.ndarray) -> np.ndarray: return X_mapped # ==================== - # Initial Design + # TASK_INIT_DESIGN: + # get_initial_design() + # generate_initial_design() + # curate_initial_design() + # rm_initial_design_NA_values() + # validate_x0() + # check_size_initial_design() + # get_best_xy_initial_design() # ==================== def get_initial_design(self, X0: Optional[np.ndarray] = None) -> np.ndarray: @@ -2331,7 +2133,7 @@ def curate_initial_design(self, X0: np.ndarray) -> np.ndarray: return X0 - def rm_NA_values( + def rm_initial_design_NA_values( self, X0: np.ndarray, y0: np.ndarray ) -> Tuple[np.ndarray, np.ndarray, int]: """Remove NaN/inf values from initial design evaluations. @@ -2362,14 +2164,14 @@ def rm_NA_values( ) X0 = np.array([[1, 2], [3, 4], [5, 6]]) y0 = np.array([5.0, np.nan, np.inf]) - X0_clean, y0_clean, n_eval = opt.rm_NA_values(X0, y0) + X0_clean, y0_clean, n_eval = opt.rm_initial_design_NA_values(X0, y0) print(X0_clean.shape) # (1, 2) print(y0_clean) # array([5.]) print(n_eval) # 3 # All valid values - no filtering X0 = np.array([[1, 2], [3, 4]]) y0 = np.array([5.0, 25.0]) - X0_clean, y0_clean, n_eval = opt.rm_NA_values(X0, y0) + X0_clean, y0_clean, n_eval = opt.rm_initial_design_NA_values(X0, y0) print(X0_clean.shape) # (2, 2) print(n_eval) # 2 ``` @@ -2666,62 +2468,156 @@ def get_best_xy_initial_design(self) -> None: print(f"Initial best: f(x) = {self.best_y_:.6f}") - def remove_nan( - self, X: np.ndarray, y: np.ndarray, stop_on_zero_return: bool = True - ) -> tuple: - """Remove rows where y contains NaN or inf values. - Used in the optimize() method after function evaluations. - - Args: - X (ndarray): Design matrix, shape (n_samples, n_features). - y (ndarray): Objective values, shape (n_samples,). - stop_on_zero_return (bool): If True, raise error when all values are removed. + # ==================== + # TASK_Surrogate: + # init_surrogate() + # _fit_surrogate() + # _fit_scheduler() + # ==================== - Returns: - tuple: (X_clean, y_clean) with NaN/inf rows removed. + def init_surrogate(self) -> None: + """Initialize or configure the surrogate model for optimization. Handles three surrogate configurations: + * List of surrogates: sets up multi-surrogate selection with probability weights and per-surrogate `max_surrogate_points`. + * None (default): creates a `GaussianProcessRegressor` with a + `ConstantKernel * Matern(nu=2.5)` kernel, 100 optimizer restarts, + and `normalize_y=True`. + * User-provided surrogate: accepted as-is; internal bookkeeping + attributes (`_max_surrogate_points_list`, + `_active_max_surrogate_points`) are still initialised. + After this method returns the following attributes are set: + * `self.surrogate` — the active surrogate model. + * `self._surrogates_list` — `list | None`. + * `self._prob_surrogate` — normalised selection probabilities or `None`. + * `self._max_surrogate_points_list` — per-surrogate point caps or `None`. + * `self._active_max_surrogate_points` — active cap. Raises: - ValueError: If all values are NaN/inf and stop_on_zero_return is True. + ValueError: If the surrogate list is empty. + ValueError: If 'prob_surrogate' length does not match the surrogate list length. + ValueError: If 'max_surrogate_points' list length does not match the surrogate list length. + + Returns: + None Examples: ```{python} import numpy as np from spotoptim import SpotOptim - from spotoptim.function import sphere - opt = SpotOptim(fun=sphere, bounds=[(-5, 5)]) - X = np.array([[1, 2], [3, 4], [5, 6]]) - y = np.array([1.0, np.nan, np.inf]) - X_clean, y_clean = opt.remove_nan(X, y, stop_on_zero_return=False) - print("Clean X:", X_clean) - print("Clean y:", y_clean) + # Default surrogate (GaussianProcessRegressor) + opt = SpotOptim( + fun=lambda X: np.sum(X**2, axis=1), + bounds=[(-5, 5), (-5, 5)], + n_initial=5, + ) + print(type(opt.surrogate).__name__) + ``` + + ```{python} + import numpy as np + from spotoptim import SpotOptim + from sklearn.ensemble import RandomForestRegressor + # User-provided surrogate + rf = RandomForestRegressor(n_estimators=50, random_state=42) + opt = SpotOptim( + fun=lambda X: np.sum(X**2, axis=1), + bounds=[(-5, 5), (-5, 5)], + n_initial=5, + surrogate=rf, + ) + print(type(opt.surrogate).__name__) + ``` + + ```{python} + import numpy as np + from spotoptim import SpotOptim + from sklearn.ensemble import RandomForestRegressor + from sklearn.gaussian_process import GaussianProcessRegressor + # List of surrogates with selection probabilities + surrogates = [GaussianProcessRegressor(), RandomForestRegressor()] + opt = SpotOptim( + fun=lambda X: np.sum(X**2, axis=1), + bounds=[(-5, 5), (-5, 5)], + n_initial=5, + surrogate=surrogates, + prob_surrogate=[0.7, 0.3], + ) + print(opt._prob_surrogate) + print([type(s).__name__ for s in opt._surrogates_list]) ``` """ - # Find finite values - finite_mask = np.isfinite(y) + self._surrogates_list = None + self._prob_surrogate = None - if not np.any(finite_mask): - msg = "All objective function values are NaN or inf." - if stop_on_zero_return: - raise ValueError(msg) + if isinstance(self.surrogate, list): + self._surrogates_list = self.surrogate + if not self._surrogates_list: + raise ValueError("Surrogate list cannot be empty.") + + # Handle probabilities + if self.config.prob_surrogate is None: + # Uniform probability + n = len(self._surrogates_list) + self._prob_surrogate = [1.0 / n] * n else: - if self.verbose: - print(f"Warning: {msg} Returning empty arrays.") - return np.array([]).reshape(0, X.shape[1]), np.array([]) + probs = self.config.prob_surrogate + if len(probs) != len(self._surrogates_list): + raise ValueError( + f"Length of prob_surrogate ({len(probs)}) must match " + f"number of surrogates ({len(self._surrogates_list)})." + ) + # Normalize probabilities + total = sum(probs) + if not np.isclose(total, 1.0) and total > 0: + self._prob_surrogate = [p / total for p in probs] + else: + self._prob_surrogate = probs - # Filter out non-finite values - n_removed = np.sum(~finite_mask) - if n_removed > 0 and self.verbose: - print(f"Warning: Removed {n_removed} sample(s) with NaN/inf values") + # Handle max_surrogate_points list + self._max_surrogate_points_list = None + if isinstance(self.config.max_surrogate_points, list): + if len(self.config.max_surrogate_points) != len(self._surrogates_list): + raise ValueError( + f"Length of max_surrogate_points ({len(self.config.max_surrogate_points)}) " + f"must match number of surrogates ({len(self._surrogates_list)})." + ) + self._max_surrogate_points_list = self.config.max_surrogate_points + else: + # If int or None, broadcast to list for easier indexing + self._max_surrogate_points_list = [ + self.config.max_surrogate_points + ] * len(self._surrogates_list) - return X[finite_mask], y[finite_mask] + # Set initial surrogate and max points + self.surrogate = self._surrogates_list[0] + self._active_max_surrogate_points = self._max_surrogate_points_list[0] - # ==================== - # Surrogate & Acquisition - # ==================== + elif self.surrogate is None: + # Default single surrogate case + self._max_surrogate_points_list = None + self._active_max_surrogate_points = self.config.max_surrogate_points + + kernel = ConstantKernel(1.0, (1e-2, 1e12)) * Matern( + length_scale=1.0, length_scale_bounds=(1e-4, 1e2), nu=2.5 + ) + + # Determine optimizer for GPR + optimizer = "fmin_l_bfgs_b" # Default used by sklearn + if self.config.acquisition_optimizer_kwargs is not None: + optimizer = partial( + gpr_minimize_wrapper, **self.config.acquisition_optimizer_kwargs + ) + + self.surrogate = GaussianProcessRegressor( + kernel=kernel, + n_restarts_optimizer=100, + normalize_y=True, + random_state=self.seed, + optimizer=optimizer, + ) def _fit_surrogate(self, X: np.ndarray, y: np.ndarray) -> None: """Fit surrogate model to data. - Used in optimize() to fit the surrogate model. + Used by _fit_scheduler() to fit the surrogate model. If the number of points exceeds `self.max_surrogate_points`, a subset of points is selected using the selection dispatcher. @@ -2832,6 +2728,12 @@ def _fit_scheduler(self) -> None: X_for_surrogate = self.transform_X(self.X_) self._fit_surrogate(X_for_surrogate, self.y_) + # ==================== + # TASK_PREDICT: + # _predict_with_uncertainty() + # _acquisition_function() + # ==================== + def _predict_with_uncertainty(self, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """Predict with uncertainty estimates, handling surrogates without return_std. Used in the _acquisition_function() method and in the plot_surrogate() method. @@ -2954,6 +2856,102 @@ def _acquisition_function(self, x: np.ndarray) -> np.ndarray: raise ValueError(f"Unknown acquisition function: {self.acquisition}") + # ==================== + # TASK_OPTIM: + # evaluate_function() + # _optimize_acquisition_tricands() + # _prepare_de_kwargs() + # _optimize_acquisition_de() + # _optimize_acquisition_scipy() + # _try_optimizer_candidates() + # remove_nan() + # _handle_acquisition_failure() + # _try_fallback_strategy() + # get_shape() + # ==================== + + def evaluate_function(self, X: np.ndarray) -> np.ndarray: + """Evaluate objective function at points X. + Used in the optimize() method to evaluate the objective function. + + Input Space: `X` is expected in Transformed and Mapped Space (Internal scale, Reduced dimensions). + Process as follows: + 1. Expands `X` to Transformed Space (Full dimensions) if dimension reduction is active. + 2. Inverse transforms `X` to Natural Space (Original scale). + 3. Evaluates the user function with points in Natural Space. + + If dimension reduction is active, expands `X` to full dimensions before evaluation. + Supports both single-objective and multi-objective functions. For multi-objective + functions, converts to single-objective using `mo2so` method. + + Args: + X (ndarray): Points to evaluate in Transformed and Mapped Space, shape (n_samples, n_reduced_features). + + Returns: + ndarray: Function values, shape (n_samples,). + + Examples: + ```{python} + import numpy as np + from spotoptim import SpotOptim + # Single-objective function + opt_so = SpotOptim( + fun=lambda X: np.sum(X**2, axis=1), + bounds=[(-5, 5), (-5, 5)], + max_iter=10, + n_initial=5 + ) + X = np.array([[1.0, 2.0], [3.0, 4.0]]) + y = opt_so.evaluate_function(X) + print(f"Single-objective output: {y}") + ``` + + ```{python} + import numpy as np + from spotoptim import SpotOptim + # Multi-objective function (default: use first objective) + opt_mo = SpotOptim( + fun=lambda X: np.column_stack([ + np.sum(X**2, axis=1), + np.sum((X-1)**2, axis=1) + ]), + bounds=[(-5, 5), (-5, 5)], + max_iter=10, + n_initial=5 + ) + y_mo = opt_mo.evaluate_function(X) + print(f"Multi-objective output (first obj): {y_mo}") + ``` + """ + # Ensure X is 2D + X = np.atleast_2d(X) + + # Expand to full dimensions if needed + if self.red_dim: + X = self.to_all_dim(X) + + # Apply inverse transformations to get original scale for function evaluation + X_original = self.inverse_transform_X(X) + + # Map factor variables to original string values + X_for_eval = self.map_to_factor_values(X_original) + + # Evaluate function + y_raw = self.fun(X_for_eval, *self.args, **self.kwargs) + + # Convert to numpy array if needed + if not isinstance(y_raw, np.ndarray): + y_raw = np.array([y_raw]) + + # Handle multi-objective case + y = self.mo2so(y_raw) + + # Ensure y is 1D + if y.ndim > 1: + y = y.ravel() + + return y + def _optimize_acquisition_tricands(self) -> np.ndarray: """Optimize using geometric infill strategy via triangulation candidates. @@ -3333,39 +3331,89 @@ def is_valid(p, reference_set): return valid_candidates - def _handle_acquisition_failure(self) -> np.ndarray: - """Handle acquisition failure by proposing new design points. - Used in the suggest_next_infill_point() method. - This method is called when no new design points can be suggested - by the surrogate model (e.g., when the proposed point is too close - to existing points). It proposes a random space-filling design as a fallback. + def remove_nan( + self, X: np.ndarray, y: np.ndarray, stop_on_zero_return: bool = True + ) -> tuple: + """Remove rows where y contains NaN or inf values. + Used in the optimize() method after function evaluations. + + Args: + X (ndarray): Design matrix, shape (n_samples, n_features). + y (ndarray): Objective values, shape (n_samples,). + stop_on_zero_return (bool): If True, raise error when all values are removed. Returns: - ndarray: New design point as a fallback, shape (n_features,). + tuple: (X_clean, y_clean) with NaN/inf rows removed. + + Raises: + ValueError: If all values are NaN/inf and stop_on_zero_return is True. Examples: - >>> import numpy as np - >>> from spotoptim import SpotOptim - >>> opt = SpotOptim( - ... fun=lambda X: np.sum(X**2, axis=1), - ... bounds=[(-5, 5), (-5, 5)], - ... acquisition_failure_strategy='random' - ... ) - >>> opt.X_ = np.array([[0, 0], [1, 1]]) - >>> opt.y_ = np.array([0, 2]) - >>> x_fallback = opt._handle_acquisition_failure() - >>> x_fallback.shape - (2,) - >>> print(x_fallback) - [some new point within bounds] - """ - if self.acquisition_failure_strategy == "random": - # Default: random space-filling design (Latin Hypercube Sampling) - if self.verbose: - print( - "Acquisition failure: Using random space-filling design as fallback." - ) - x_new_unit = self.lhs_sampler.random(n=1)[0] + ```{python} + import numpy as np + from spotoptim import SpotOptim + from spotoptim.function import sphere + opt = SpotOptim(fun=sphere, bounds=[(-5, 5)]) + X = np.array([[1, 2], [3, 4], [5, 6]]) + y = np.array([1.0, np.nan, np.inf]) + X_clean, y_clean = opt.remove_nan(X, y, stop_on_zero_return=False) + print("Clean X:", X_clean) + print("Clean y:", y_clean) + ``` + """ + # Find finite values + finite_mask = np.isfinite(y) + + if not np.any(finite_mask): + msg = "All objective function values are NaN or inf." + if stop_on_zero_return: + raise ValueError(msg) + else: + if self.verbose: + print(f"Warning: {msg} Returning empty arrays.") + return np.array([]).reshape(0, X.shape[1]), np.array([]) + + # Filter out non-finite values + n_removed = np.sum(~finite_mask) + if n_removed > 0 and self.verbose: + print(f"Warning: Removed {n_removed} sample(s) with NaN/inf values") + + return X[finite_mask], y[finite_mask] + + + def _handle_acquisition_failure(self) -> np.ndarray: + """Handle acquisition failure by proposing new design points. + Used in the suggest_next_infill_point() method. + This method is called when no new design points can be suggested + by the surrogate model (e.g., when the proposed point is too close + to existing points). It proposes a random space-filling design as a fallback. + + Returns: + ndarray: New design point as a fallback, shape (n_features,). + + Examples: + >>> import numpy as np + >>> from spotoptim import SpotOptim + >>> opt = SpotOptim( + ... fun=lambda X: np.sum(X**2, axis=1), + ... bounds=[(-5, 5), (-5, 5)], + ... acquisition_failure_strategy='random' + ... ) + >>> opt.X_ = np.array([[0, 0], [1, 1]]) + >>> opt.y_ = np.array([0, 2]) + >>> x_fallback = opt._handle_acquisition_failure() + >>> x_fallback.shape + (2,) + >>> print(x_fallback) + [some new point within bounds] + """ + if self.acquisition_failure_strategy == "random": + # Default: random space-filling design (Latin Hypercube Sampling) + if self.verbose: + print( + "Acquisition failure: Using random space-filling design as fallback." + ) + x_new_unit = self.lhs_sampler.random(n=1)[0] x_new = self.lower + x_new_unit * (self.upper - self.lower) return self.repair_non_numeric(x_new.reshape(1, -1), self.var_type)[0] @@ -3461,6 +3509,14 @@ def get_shape(self, y: np.ndarray) -> Tuple[int, Optional[int]]: # For higher dimensions, flatten to 1D return y.size, None + + # ==================== + # TASK_MO: + # store_mo() + # mo2so() + # ==================== + + def store_mo(self, y_mo: np.ndarray) -> None: """Store multi-objective values in self.y_mo. If multi-objective values are present (ndim==2), they are stored in self.y_mo. @@ -3576,6 +3632,100 @@ def custom_mo2so(y_mo): return y0 + # ==================== + # TASK_OCBA: + # apply_ocba() + # get_ranks() + # get_ocba() + # get_ocba_X() + # ==================== + + + def apply_ocba(self) -> Optional[np.ndarray]: + """Apply Optimal Computing Budget Allocation for noisy functions. + Determines which existing design points should be re-evaluated based on + OCBA algorithm. This method computes optimal budget allocation to improve + the quality of the estimated best design. + + Returns: + Optional[ndarray]: + Array of design points to re-evaluate, shape (n_re_eval, n_features). + Returns None if OCBA conditions are not met or OCBA is disabled. + + Note: + OCBA is only applied when: + + * (self.repeats_initial > 1) or (self.repeats_surrogate > 1) + * self.ocba_delta > 0 + * All variances are > 0 + * At least 3 design points exist + + Examples: + ```{python} + import numpy as np + from spotoptim import SpotOptim + opt = SpotOptim( + fun=lambda X: np.sum(X**2, axis=1) + np.random.normal(0, 0.1, X.shape[0]), + bounds=[(-5, 5), (-5, 5)], + n_initial=5, + repeats_surrogate=2, + ocba_delta=5, + verbose=True + ) + # Simulate optimization state (normally done in optimize()) + opt.mean_X = np.array([[1, 2], [0, 0], [2, 1]]) + opt.mean_y = np.array([5.0, 0.1, 5.0]) + opt.var_y = np.array([0.1, 0.05, 0.15]) + X_ocba = opt.apply_ocba() + # OCBA: Adding 5 re-evaluation(s). + # The following should be true: + print(X_ocba.shape[0] == 5) + ``` + + OCBA skipped - insufficient points + ```{python} + import numpy as np + from spotoptim import SpotOptim + opt2 = SpotOptim( + fun=lambda X: np.sum(X**2, axis=1), + bounds=[(-5, 5), (-5, 5)], + repeats_surrogate=2, + ocba_delta=5, + verbose=True + ) + opt2.mean_X = np.array([[1, 2], [0, 0]]) + opt2.mean_y = np.array([5.0, 0.1]) + opt2.var_y = np.array([0.1, 0.05]) + X_ocba = opt2.apply_ocba() + # Warning: OCBA skipped (need >2 points with variance > 0) + print(X_ocba is None) + ``` + """ + # OCBA: Compute optimal budget allocation for noisy functions + # This determines which existing design points should be re-evaluated + X_ocba = None + if ( + self.repeats_initial > 1 or self.repeats_surrogate > 1 + ) and self.ocba_delta > 0: + # Check conditions for OCBA (need variance > 0 and at least 3 points) + if not np.all(self.var_y > 0) and (self.mean_X.shape[0] <= 2): + if self.verbose: + print("Warning: OCBA skipped (need >2 points with variance > 0)") + elif np.all(self.var_y > 0) and (self.mean_X.shape[0] > 2): + # Get OCBA allocation + X_ocba = self.get_ocba_X( + self.mean_X, + self.mean_y, + self.var_y, + self.ocba_delta, + verbose=self.verbose, + ) + if self.verbose and X_ocba is not None: + print(f" OCBA: Adding {X_ocba.shape[0]} re-evaluation(s)") + + return X_ocba + + def get_ranks(self, x: np.ndarray) -> np.ndarray: """Returns ranks of numbers within input array x. @@ -3721,87 +3871,14 @@ def get_ocba_X( else: return None - def evaluate_function(self, X: np.ndarray) -> np.ndarray: - """Evaluate objective function at points X. - Used in the optimize() method to evaluate the objective function. - - Input Space: `X` is expected in Transformed and Mapped Space (Internal scale, Reduced dimensions). - Process as follows: - 1. Expands `X` to Transformed Space (Full dimensions) if dimension reduction is active. - 2. Inverse transforms `X` to Natural Space (Original scale). - 3. Evaluates the user function with points in Natural Space. - - If dimension reduction is active, expands `X` to full dimensions before evaluation. - Supports both single-objective and multi-objective functions. For multi-objective - functions, converts to single-objective using `mo2so` method. - - Args: - X (ndarray): Points to evaluate in Transformed and Mapped Space, shape (n_samples, n_reduced_features). - - Returns: - ndarray: Function values, shape (n_samples,). - - Examples: - ```{python} - import numpy as np - from spotoptim import SpotOptim - # Single-objective function - opt_so = SpotOptim( - fun=lambda X: np.sum(X**2, axis=1), - bounds=[(-5, 5), (-5, 5)], - max_iter=10, - n_initial=5 - ) - X = np.array([[1.0, 2.0], [3.0, 4.0]]) - y = opt_so.evaluate_function(X) - print(f"Single-objective output: {y}") - ``` - - ```{python} - import numpy as np - from spotoptim import SpotOptim - # Multi-objective function (default: use first objective) - opt_mo = SpotOptim( - fun=lambda X: np.column_stack([ - np.sum(X**2, axis=1), - np.sum((X-1)**2, axis=1) - ]), - bounds=[(-5, 5), (-5, 5)], - max_iter=10, - n_initial=5 - ) - y_mo = opt_mo.evaluate_function(X) - print(f"Multi-objective output (first obj): {y_mo}") - ``` - """ - # Ensure X is 2D - X = np.atleast_2d(X) - - # Expand to full dimensions if needed - if self.red_dim: - X = self.to_all_dim(X) - - # Apply inverse transformations to get original scale for function evaluation - X_original = self.inverse_transform_X(X) - - # Map factor variables to original string values - X_for_eval = self.map_to_factor_values(X_original) - - # Evaluate function - y_raw = self.fun(X_for_eval, *self.args, **self.kwargs) - - # Convert to numpy array if needed - if not isinstance(y_raw, np.ndarray): - y_raw = np.array([y_raw]) - - # Handle multi-objective case - y = self.mo2so(y_raw) - - # Ensure y is 1D - if y.ndim > 1: - y = y.ravel() - - return y + # ==================== + # TASK_SELECT: + # select_distant_points() + # select_best_cluster() + # _selection_dispatcher() + # select_new() + # suggest_next_infill_point() + # ==================== def select_distant_points( self, X: np.ndarray, y: np.ndarray, k: int @@ -3993,22 +4070,133 @@ def sphere(X): ind = is_duplicate return A[~ind], ~ind - def optimize_acquisition_func(self) -> np.ndarray: - """Optimize the acquisition function to find the next point to evaluate. - Returns: - ndarray: The optimized point(s). - If acquisition_fun_return_size == 1, returns 1D array of shape (n_features,). - If acquisition_fun_return_size > 1, returns 2D array of shape (N, n_features), - where N is min(acquisition_fun_return_size, population_size). - Examples: - ```{python} - import numpy as np - from spotoptim import SpotOptim - def sphere(X): - X = np.atleast_2d(X) - return np.sum(X**2, axis=1) + def suggest_next_infill_point(self) -> np.ndarray: + """Suggest next point to evaluate (dispatcher). + Used in both sequential and parallel optimization loops. This method orchestrates the process of generating candidate points from the acquisition function optimizer, handling any failures in the acquisition process with a fallback strategy, and ensuring that the returned point(s) are valid and ready for evaluation. + The returned point is in the Transformed and Mapped Space (Internal Optimization Space). + This means: + 1. Transformations (e.g., log, sqrt) have been applied. + 2. Dimension reduction has been applied (fixed variables removed). + Process: + 1. Try candidates from acquisition function optimizer. + 2. Handle acquisition failure (fallback). + 3. Return last attempt if all fails. + + + Returns: + ndarray: Next point(s) to evaluate in Transformed and Mapped Space. + Shape is (n_infill_points, n_features). + + Examples: + ```{python} + import numpy as np + from spotoptim import SpotOptim + def sphere(X): + X = np.atleast_2d(X) + return np.sum(X**2, axis=1) + opt = SpotOptim( + fun=sphere, + bounds=[(-5, 5), (-5, 5)], + n_initial=5, + n_infill_points=2 + ) + # Need to initialize optimization state (X_, y_, surrogate) + # Normally done inside optimize() + np.random.seed(0) + opt.X_ = np.random.rand(10, 2) + opt.y_ = np.random.rand(10) + opt._fit_surrogate(opt.X_, opt.y_) + x_next = opt.suggest_next_infill_point() + x_next.shape + ``` + """ + # 1. Optimizer candidates + candidates = [] + opt_candidates = self._try_optimizer_candidates( + n_needed=self.n_infill_points, current_batch=candidates + ) + candidates.extend(opt_candidates) + + if len(candidates) >= self.n_infill_points: + return np.vstack(candidates) + + # 2. Try fallback strategy to fill remaining slots + while len(candidates) < self.n_infill_points: + # Just try one attempt at a time but loop + # We pass current batch to avoid dups + cand, x_last = self._try_fallback_strategy( + max_attempts=10, current_batch=candidates + ) + if cand is not None: + candidates.append(cand) + else: + # Fallback failed to find unique point even after retries + # Break and fill with last attempts or just return what we have? + # If we return partial batch, we might fail downstream if code expects n points? + # Actually code should handle any number of points returned by this method? + # Or duplicate valid points? + # Warn and use duplicate if absolutely necessary? + if self.verbose: + print( + "Warning: Could not fill all infill points with unique candidates." + ) + break + + if len(candidates) > 0: + return np.vstack(candidates) + + # 3. Return last attempt (duplicate) if absolutely nothing found + # This returns a single point (1, d). + # Should we return n copies? + # If n_infill_points > 1, we should probably output (n, d) + + if self.verbose: + print( + "Warning: Could not find unique point after optimization candidates and fallback attempts. " + "Returning last candidate (duplicate)." + ) + + # Verify x_last is not None + if x_last is None: + # Should practically not happen + x_next = self._handle_acquisition_failure() + return x_next.reshape(1, -1) + + # Return duplicated x_last to fill n_infill_points? OR just 1? + # Let's return 1 and let loop repeat it? + # But loop repeats based on x_next logic. + # If we return 1 point, it is treated as 1 point. + # If user asked for n_infill_points, maybe we should just return what we have (1 duplicated). + + return x_last.reshape(1, -1) + + + # ==================== + # TASK_OPTIM: + # optimize_acquisition_func() + # _optimize_run_task() + # optimize() + # execute_optimization_run() + # ==================== + + def optimize_acquisition_func(self) -> np.ndarray: + """Optimize the acquisition function to find the next point to evaluate. + + Returns: + ndarray: The optimized point(s). + If acquisition_fun_return_size == 1, returns 1D array of shape (n_features,). + If acquisition_fun_return_size > 1, returns 2D array of shape (N, n_features), + where N is min(acquisition_fun_return_size, population_size). + + Examples: + ```{python} + import numpy as np + from spotoptim import SpotOptim + def sphere(X): + X = np.atleast_2d(X) + return np.sum(X**2, axis=1) opt = SpotOptim( fun=sphere, bounds=[(-5, 5), (-5, 5)], @@ -4034,9 +4222,6 @@ def sphere(X): else: return self._optimize_acquisition_scipy() - # ==================== - # Optimization Loop - # ==================== def _optimize_run_task( self, @@ -4292,104 +4477,214 @@ def execute_optimization_run( ) # ==================== - # Sequential Optimization Loop + # TASK_OPTIM_SEQ: + # determine_termination() + # apply_penalty_NA() + # update_best_main_loop() + # handle_NA_new_points() + # optimize_sequential_run() + # _initialize_run() + # update_repeats_infill_points() + # _run_sequential_loop() # ==================== - def optimize_sequential_run( - self, - timeout_start: float, - X0: Optional[np.ndarray] = None, - y0_known: Optional[float] = None, - max_iter_override: Optional[int] = None, - shared_best_y=None, - shared_lock=None, - ) -> Tuple[str, OptimizeResult]: - """Perform a single sequential optimization run. - Calls _initialize_run, rm_NA_values, check_size_initial_design, init_storage, get_best_xy_initial_design, and _run_sequential_loop. + def determine_termination(self, timeout_start: float) -> str: + """Determine termination reason for optimization. + Checks the termination conditions and returns an appropriate message + indicating why the optimization stopped. Three possible termination + conditions are checked in order of priority: + 1. Maximum number of evaluations reached + 2. Maximum time limit exceeded + 3. Successful completion (neither limit reached) + Args: - timeout_start (float): Start time for timeout. - X0 (Optional[np.ndarray]): Initial design points in Natural Space, shape (n_initial, n_features). - y0_known (Optional[float]): Known best value for initial design. - max_iter_override (Optional[int]): Override for maximum number of iterations. - shared_best_y (Optional[float]): Shared best value for parallel runs. - shared_lock (Optional[Lock]): Shared lock for parallel runs. + timeout_start (float): Start time of optimization (from time.time()). Returns: - Tuple[str, OptimizeResult]: Tuple containing status and optimization result. - - Raises: - ValueError: If the initial design has no valid points after removing NaN/inf values, or if the initial design is too small to proceed. + str: Message describing the termination reason. Examples: ```{python} + import numpy as np + import time + from spotoptim import SpotOptim + opt = SpotOptim( + fun=lambda X: np.sum(X**2, axis=1), + bounds=[(-5, 5), (-5, 5)], + max_iter=10, + max_time=10.0 + ) + # Case 1: Maximum evaluations reached + opt.y_ = np.zeros(20) # Simulate 20 evaluations + start_time = time.time() + msg = opt.determine_termination(start_time) + print(msg) + ``` + ```{python} + # Case 2: Time limit exceeded + import numpy as np import time + from spotoptim import SpotOptim + opt.y_ = np.zeros(10) # Only 10 evaluations + start_time = time.time() - 700 # Simulate 11.67 minutes elapsed + msg = opt.determine_termination(start_time) + print(msg) + ``` + ```{python} + # Case 3: Successful completion import numpy as np + import time from spotoptim import SpotOptim - from spotoptim.function import sphere - opt = SpotOptim(fun=sphere, - bounds=[(-5, 5), (-5, 5)], - n_initial=5, - max_iter=10, - seed=0, - n_jobs=1, # Use sequential optimization for deterministic output - verbose=True - ) - status, result = opt.optimize_sequential_run(timeout_start=time.time()) - print(status) - print(result.message.splitlines()[0]) + opt.y_ = np.zeros(10) # Under max_iter + start_time = time.time() # Just started + msg = opt.determine_termination(start_time) + print(msg) ``` """ + # Determine termination reason + elapsed_time = time.time() - timeout_start + if len(self.y_) >= self.max_iter: + message = f"Optimization terminated: maximum evaluations ({self.max_iter}) reached" + elif elapsed_time >= self.max_time * 60: + message = ( + f"Optimization terminated: time limit ({self.max_time:.2f} min) reached" + ) + else: + message = "Optimization finished successfully" - # Store shared variable if provided - self.shared_best_y = shared_best_y - self.shared_lock = shared_lock - - # Initialize: Set seed, Design, Evaluate Initial Design, Init Storage & TensorBoard - X0, y0 = self._initialize_run(X0, y0_known) - - # Handle NaN/inf values in initial design (remove invalid points) - X0, y0, n_evaluated = self.rm_NA_values(X0, y0) + return message - # Check if we have enough valid points to continue - self.check_size_initial_design(y0, n_evaluated) - # Initialize storage and statistics - self.init_storage(X0, y0) - self._zero_success_count = 0 - self._success_history = [] # Clear success history for new run - # Update stats after initial design - self.update_stats() + def apply_penalty_NA( + self, + y: np.ndarray, + y_history: Optional[np.ndarray] = None, + penalty_value: Optional[float] = None, + sd: float = 0.1, + ) -> np.ndarray: + """Replace NaN and infinite values with penalty plus random noise. + Used in the optimize() method after function evaluations. + This method follows the approach from spotpython.utils.repair.apply_penalty_NA, + replacing NaN/inf values with a penalty value plus random noise to avoid + identical penalty values. - # Log initial design to TensorBoard - self._init_tensorboard() + Args: + y (ndarray): Array of objective function values to be repaired. + y_history (ndarray, optional): Historical objective function values used for + computing penalty statistics. If None, uses y itself. Default is None. + penalty_value (float, optional): Value to replace NaN/inf with. + If None, computes penalty as: max(finite_y_history) + 3 * std(finite_y_history). + If all values are NaN/inf or only one finite value exists, falls back + to self.penalty_val. Default is None. + sd (float): Standard deviation for normal distributed random noise added to penalty. + Default is 0.1. - # Determine and report initial best - self.get_best_xy_initial_design() + Returns: + ndarray: + Array with NaN/inf replaced by penalty_value + random noise + (normal distributed with mean 0 and standard deviation sd). - # Run the main sequential optimization loop - effective_max_iter = ( - max_iter_override if max_iter_override is not None else self.max_iter - ) - return self._run_sequential_loop(timeout_start, effective_max_iter) + Examples: + ```{python} + import numpy as np + from spotoptim import SpotOptim + opt = SpotOptim(fun=lambda X: np.sum(X**2, axis=1), bounds=[(-5, 5)]) + y_hist = np.array([1.0, 2.0, 3.0, 5.0]) + y_new = np.array([4.0, np.nan, np.inf]) + y_clean = opt.apply_penalty_NA(y_new, y_history=y_hist) + print(f"np.all(np.isfinite(y_clean)): {np.all(np.isfinite(y_clean))}") + print(f"y_clean: {y_clean}") + # NaN/inf replaced with worst value from history + 3*std + noise + print(f"y_clean[1] > 5.0: {y_clean[1] > 5.0}") # Should be larger than max finite value in history + ``` + """ - def _initialize_run( - self, X0: Optional[np.ndarray], y0_known: Optional[float] - ) -> Tuple[np.ndarray, np.ndarray]: - """Initialize optimization run: seed, design generation, initial evaluation. - Called from optimize_sequential_run (sequential path only). + # Ensure y is a float array (maps non-convertible values like "error" or None to nan) + def _safe_float(v): + try: + return float(v) + except (ValueError, TypeError): + return np.nan - Args: - X0 (Optional[np.ndarray]): Initial design points in Natural Space, shape (n_initial, n_features). - y0_known (Optional[float]): Known best value for initial design. + y_flat = np.array(y).flatten() + y = np.array([_safe_float(v) for v in y_flat]) + # Identify NaN and inf values in y + mask = ~np.isfinite(y) - Returns: - Tuple[np.ndarray, np.ndarray]: Tuple containing initial design and corresponding objective values. + if np.any(mask): + n_bad = np.sum(mask) - Raises: - ValueError: If the initial design has no valid points after removing NaN/inf values, or if the initial design is too small to proceed. + # Compute penalty_value if not provided + if penalty_value is None: + # Get finite values from history for statistics + # Use y_history if provided, otherwise fall back to y itself + if y_history is not None: + finite_values = y_history[np.isfinite(y_history)] + else: + # Use current y values + finite_values = y[~mask] + + # If we have at least 2 finite values, compute adaptive penalty + if len(finite_values) >= 2: + max_y = np.max(finite_values) + std_y = np.std(finite_values, ddof=1) + penalty_value = max_y + 3.0 * std_y + + if self.verbose: + print( + f"Warning: Found {n_bad} NaN/inf value(s), replacing with " + f"adaptive penalty (max + 3*std = {penalty_value:.4f})" + ) + else: + # Fallback to self.penalty if insufficient finite values + if self.penalty_val is not None: + penalty_value = self.penalty_val + elif len(finite_values) == 1: + # Use the single finite value + a large constant + penalty_value = finite_values[0] + 1000.0 + else: + # All values are NaN/inf, use a large default + penalty_value = 1e10 + + if self.verbose: + print( + f"Warning: Found {n_bad} NaN/inf value(s), insufficient finite values " + f"for adaptive penalty. Using penalty_value = {penalty_value}" + ) + else: + if self.verbose: + print( + f"Warning: Found {n_bad} NaN/inf value(s), replacing with {penalty_value} + noise" + ) + + # Generate random noise and add to penalty + random_noise = self.rng.normal(0, sd, y.shape) + penalty_values = penalty_value + random_noise + + # Replace NaN/inf with penalty + noise + y[mask] = penalty_values[mask] + + return y + + + def _update_best_main_loop( + self, + x_next_repeated: np.ndarray, + y_next: np.ndarray, + start_time: Optional[float] = None, + ) -> None: + """Update best solution found during main optimization loop. + Checks if any new evaluations improve upon the current best solution. + If improvement is found, updates best_x_ and best_y_ attributes and + prints progress if verbose mode is enabled. + + Args: + x_next_repeated (ndarray): Design points that were evaluated in transformed space, + shape (n_eval, n_features). + y_next (ndarray): Function values at x_next_repeated, shape (n_eval,). Examples: >>> import numpy as np @@ -4398,218 +4693,540 @@ def _initialize_run( ... fun=lambda X: np.sum(X**2, axis=1), ... bounds=[(-5, 5), (-5, 5)], ... n_initial=5, - ... seed=0, - ... x0=np.array([0.0, 0.0]), ... verbose=True ... ) - >>> X0, y0 = opt._initialize_run(X0=None, y0_known=None) - >>> X0.shape - (5, 2) - >>> np.allclose(y0, np.sum(X0**2, axis=1)) - True + >>> # Simulate optimization state + >>> opt.n_iter_ = 1 + >>> opt.best_x_ = np.array([1.0, 1.0]) + >>> opt.best_y_ = 2.0 + >>> + >>> # Case 1: New best found + >>> x_new = np.array([[0.1, 0.1], [0.5, 0.5]]) + >>> y_new = np.array([0.02, 0.5]) + >>> opt._update_best_main_loop(x_new, y_new) + Iteration 1: New best f(x) = 0.020000 + >>> opt.best_y_ + 0.02 + >>> + >>> # Case 2: No improvement + >>> opt.n_iter_ = 2 + >>> x_no_improve = np.array([[1.5, 1.5]]) + >>> y_no_improve = np.array([4.5]) + >>> opt._update_best_main_loop(x_no_improve, y_no_improve) + Iteration 2: f(x) = 4.500000 + >>> + >>> # Case 3: With noisy function + >>> opt_noise = SpotOptim( + ... fun=lambda X: np.sum(X**2, axis=1), + ... bounds=[(-5, 5), (-5, 5)], + ... repeats_surrogate=2, + ... verbose=True + ... ) + >>> opt_noise.n_iter_ = 1 + >>> opt_noise.best_y_ = 2.0 + >>> opt_noise.min_mean_y = 1.5 + >>> y_noise = np.array([0.5]) + >>> x_noise = np.array([[0.5, 0.5]]) + >>> opt_noise._update_best_main_loop(x_noise, y_noise) + Iteration 1: New best f(x) = 0.500000, mean best: f(x) = 1.500000 """ - # Set seed for reproducibility - self.set_seed() + # Update best + # Determine global best value for printing if shared variable exists + global_best_val = None + if hasattr(self, "shared_best_y") and self.shared_best_y is not None: + # Sync with global shared value + lock_obj = getattr(self, "shared_lock", None) + if lock_obj is not None: + with lock_obj: + if ( + self.best_y_ is not None + and self.best_y_ < self.shared_best_y.value + ): + self.shared_best_y.value = self.best_y_ - # Set initial design (generate or process user-provided points) - X0 = self.get_initial_design(X0) + min_y_next = np.min(y_next) + if min_y_next < self.shared_best_y.value: + self.shared_best_y.value = min_y_next - # Curate initial design (remove duplicates, generate additional points if needed, repeat if necessary) - X0 = self.curate_initial_design(X0) + global_best_val = self.shared_best_y.value - # Evaluate initial design - if y0_known is not None and self.x0 is not None: - # Identify injected point to skip evaluation - dists = np.linalg.norm(X0 - self.x0, axis=1) - # Use a small tolerance for matching - matches = dists < 1e-9 + current_best = np.min(y_next) + if current_best < self.best_y_: + best_idx_in_new = np.argmin(y_next) + # x_next_repeated is in transformed space, convert to original for storage + self.best_x_ = self.inverse_transform_X( + x_next_repeated[best_idx_in_new].reshape(1, -1) + )[0] + self.best_y_ = current_best - if np.any(matches): - if self.verbose: - print("Skipping re-evaluation of injected best point.") + if self.verbose: + # Calculate progress + if self.max_time != np.inf and start_time is not None: + progress = (time.time() - start_time) / (self.max_time * 60) * 100 + progress_str = f"Time: {progress:.1f}%" + else: + prev_evals = sum(res.nfev for res in self.restarts_results_) + progress = (prev_evals + self.counter) / self.max_iter * 100 + progress_str = f"Evals: {progress:.1f}%" - # Initialize y0 - y0 = np.empty(len(X0)) - y0[:] = np.nan + msg = f"Iter {self.n_iter_}" + if global_best_val is not None: + msg += f" | GlobalBest: {global_best_val:.6f}" + msg += f" | Best: {self.best_y_:.6f} | Rate: {self.success_rate:.2f} | {progress_str}" - # Set known values - y0[matches] = y0_known + if (self.repeats_initial > 1) or (self.repeats_surrogate > 1): + msg += f" | Mean Best: {self.min_mean_y:.6f}" - # Evaluate others - not_matches = ~matches - if np.any(not_matches): - y0_others = self.evaluate_function(X0[not_matches]) - y0[not_matches] = y0_others + print(msg) + elif self.verbose: + if self.max_time != np.inf and start_time is not None: + progress = (time.time() - start_time) / (self.max_time * 60) * 100 + progress_str = f"Time: {progress:.1f}%" else: - # Injected point lost during curation? Should not happen if it was unique - y0 = self.evaluate_function(X0) - else: - y0 = self.evaluate_function(X0) - - return X0, y0 - + prev_evals = sum(res.nfev for res in self.restarts_results_) + progress = (prev_evals + self.counter) / self.max_iter * 100 + progress_str = f"Evals: {progress:.1f}%" + current_val = np.min(y_next) + msg = f"Iter {self.n_iter_}" + if global_best_val is not None: + msg += f" | GlobalBest: {global_best_val:.6f}" + msg += f" | Best: {self.best_y_:.6f} | Curr: {current_val:.6f} | Rate: {self.success_rate:.2f} | {progress_str}" + if (self.repeats_initial > 1) or (self.repeats_surrogate > 1): + mean_y_new = np.mean(y_next) + msg += f" | Mean Curr: {mean_y_new:.6f}" + print(msg) - def update_repeats_infill_points(self, x_next: np.ndarray) -> np.ndarray: - """Repeat infill point for noisy function evaluation. Used in the sequential_loop. - For noisy objective functions (repeats_surrogate > 1), creates multiple - copies of the suggested point for repeated evaluation. Otherwise, returns - the point in 2D array format. + def _handle_NA_new_points( + self, x_next: np.ndarray, y_next: np.ndarray + ) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]: + """Handle NaN/inf values in new evaluation points. + Applies penalties to NaN/inf values and removes any remaining invalid points. + If all evaluations are invalid, returns None for both arrays to signal that + the iteration should be skipped. Args: - x_next (ndarray): Next point to evaluate, shape (n_features,). + x_next (ndarray): Design points that were evaluated, shape (n_eval, n_features). + y_next (ndarray): Function values at x_next, shape (n_eval,). Returns: - ndarray: Points to evaluate, shape (repeats_surrogate, n_features) - or (1, n_features) if repeats_surrogate == 1. - - Examples: - ```{python} - import numpy as np - from spotoptim import SpotOptim - from spotoptim.function import sphere, noisy_sphere - # Without repeats - - opt = SpotOptim( - fun=sphere, - bounds=[(-5, 5), (-5, 5)], - repeats_surrogate=1 - ) - x_next = np.array([1.0, 2.0]) - x_repeated = opt.update_repeats_infill_points(x_next) - print(x_repeated.shape) + Tuple[Optional[ndarray], Optional[ndarray]]: Tuple of (x_clean, y_clean). + Both are None if all evaluations were NaN/inf (iteration should be skipped). + Otherwise returns filtered arrays with only finite values. - # With repeats for noisy function - opt_noisy = SpotOptim( - fun=noisy_sphere, - bounds=[(-5, 5), (-5, 5)], - repeats_surrogate=3 - ) - x_next = np.array([1.0, 2.0]) - x_repeated = opt_noisy.update_repeats_infill_points(x_next) - print(x_repeated.shape) - # All three copies should be identical - np.all(x_repeated[0] == x_repeated[1]) - ``` + Examples: + >>> import numpy as np + >>> from spotoptim import SpotOptim + >>> opt = SpotOptim( + ... fun=lambda X: np.sum(X**2, axis=1), + ... bounds=[(-5, 5), (-5, 5)], + ... n_initial=5, + ... verbose=True + ... ) + >>> # Simulate optimization state + >>> opt.y_ = np.array([1.0, 2.0, 3.0]) # Historical values + >>> opt.n_iter_ = 1 + >>> + >>> # Case 1: Some valid values + >>> x_next = np.array([[1, 2], [3, 4], [5, 6]]) + >>> y_next = np.array([5.0, np.nan, 10.0]) + >>> x_clean, y_clean = opt._handle_NA_new_points(x_next, y_next) + >>> x_clean.shape + (2, 2) + >>> y_clean.shape + (2,) + >>> + >>> # Case 2: All NaN/inf - should skip iteration + >>> x_all_bad = np.array([[1, 2], [3, 4]]) + >>> y_all_bad = np.array([np.nan, np.inf]) + >>> x_clean, y_clean = opt._handle_NA_new_points(x_all_bad, y_all_bad) + Warning: All new evaluations were NaN/inf, skipping iteration 1 + >>> x_clean is None + True + >>> y_clean is None + True """ - if x_next.ndim == 1: - x_next = x_next.reshape(1, -1) + # Handle NaN/inf values in new evaluations + # Use historical y values (self.y_) for computing penalty statistics + if self.penalty: + y_next = self.apply_penalty_NA(y_next, y_history=self.y_) - if self.repeats_surrogate > 1: - # Repeat each row repeats_surrogate times - # Note: np.repeat with axis=0 repeats rows [r1, r1, r2, r2...] - x_next_repeated = np.repeat(x_next, self.repeats_surrogate, axis=0) - else: - x_next_repeated = x_next - return x_next_repeated + # Identify which points are valid (finite) BEFORE removing them + # Note: remove_nan filters based on y_next finite values + # Ensure y_next is a float array (maps non-convertible values like "error" or None to nan) + # This is critical if the objective function returns non-numeric values and penalty=False + if y_next.dtype == object: + # Use safe float conversion similar to apply_penalty_NA + def _safe_float(v): + try: + return float(v) + except (ValueError, TypeError): + return np.nan - def _run_sequential_loop( - self, timeout_start: float, effective_max_iter: int - ) -> Tuple[str, OptimizeResult]: - """Execute the main sequential optimization loop. + # Reconstruct as float array + y_flat = np.array(y_next).flatten() + y_next = np.array([_safe_float(v) for v in y_flat]) - Args: - timeout_start (float): Start time for timeout. - effective_max_iter (int): Maximum number of iterations for this run (may be overridden for restarts). + finite_mask = np.isfinite(y_next) - Returns: - Tuple[str, OptimizeResult]: Tuple containing status and optimization result. + X_next_clean, y_next_clean = self.remove_nan( + x_next, y_next, stop_on_zero_return=False + ) - Raises: - ValueError: - If excessive consecutive failures occur (e.g., due to NaN/inf values in evaluations), indicating a potential issue with the objective function. + # If we have multi-objective values, we need to filter them too + # The new MO values were appended to self.y_mo in evaluate_function -> mo2so -> store_mo + # So self.y_mo currently contains the INVALID points at the end. + if self.y_mo is not None: + n_new = len(y_next) + # Check if y_mo has the new points appended + if len(self.y_mo) >= n_new: + # The new points are at the end of y_mo + y_mo_new = self.y_mo[-n_new:] + y_mo_old = self.y_mo[:-n_new] - Examples: - >>> import time - >>> import numpy as np - >>> from spotoptim import SpotOptim - >>> opt = SpotOptim( - ... fun=lambda X: np.sum(X**2, axis=1), - ... bounds=[(-5, 5), (-5, 5)], - ... n_initial=5, - ... max_iter=10, - ... seed=0, - ... n_jobs=1, # Use sequential optimization for deterministic output - ... verbose=True - ... ) - >>> X0, y0 = opt._initialize_run(X0=None, y0_known=None) - >>> X0, y0, n_evaluated = opt.rm_NA_values(X0, y0) - >>> opt.check_size_initial_design(y0, n_evaluated) - >>> opt.init_storage(X0, y0) - >>> opt._zero_success_count = 0 - >>> opt._success_history = [] - >>> opt.update_stats() - >>> opt.get_best_xy_initial_design() - >>> status, result = opt._run_sequential_loop(timeout_start=time.time(), effective_max_iter=10) - >>> print(status) - FINISHED - >>> print(result.message.splitlines()[0]) - Optimization terminated: maximum evaluations (10) reached - """ - consecutive_failures = 0 + # Filter the new MO points using the mask from y_next + y_mo_new_clean = y_mo_new[finite_mask] - while (len(self.y_) < effective_max_iter) and ( - time.time() < timeout_start + self.max_time * 60 - ): - # Check for excessive consecutive failures (infinite loop prevention) - if consecutive_failures > self.max_iter: - msg = ( - f"Optimization stopped due to {consecutive_failures} consecutive " - "invalid evaluations (NaN/inf). Check your objective function." - ) + # Reconstruct y_mo + if len(y_mo_old) > 0: + self.y_mo = ( + np.vstack([y_mo_old, y_mo_new_clean]) + if len(y_mo_new_clean) > 0 + else y_mo_old + ) + else: + self.y_mo = y_mo_new_clean + else: if self.verbose: - print(f"Warning: {msg}") - return "FINISHED", OptimizeResult( - x=self.best_x_, - fun=self.best_y_, - nfev=len(self.y_), - nit=self.n_iter_, - success=False, - message=msg, - X=self.X_, - y=self.y_, + print( + "Warning: y_mo size inconsistent with new points in _handle_NA_new_points" + ) + + # Skip this iteration if all new points were NaN/inf + if len(y_next_clean) == 0: + if self.verbose: + print( + f"Warning: All new evaluations were NaN/inf, skipping iteration {self.n_iter_}" ) + return None, None - # Increment iteration counter. This is not the same as number of function evaluations. - self.n_iter_ += 1 + return X_next_clean, y_next_clean - # Fit surrogate (use mean_y if noise, otherwise y_) - self._fit_scheduler() + def optimize_sequential_run( + self, + timeout_start: float, + X0: Optional[np.ndarray] = None, + y0_known: Optional[float] = None, + max_iter_override: Optional[int] = None, + shared_best_y=None, + shared_lock=None, + ) -> Tuple[str, OptimizeResult]: + """Perform a single sequential optimization run. + Calls _initialize_run, rm_initial_design_NA_values, check_size_initial_design, init_storage, get_best_xy_initial_design, and _run_sequential_loop. - # Apply OCBA for noisy functions - X_ocba = self.apply_ocba() - # Suggest next point - x_next = self.suggest_next_infill_point() + Args: + timeout_start (float): Start time for timeout. + X0 (Optional[np.ndarray]): Initial design points in Natural Space, shape (n_initial, n_features). + y0_known (Optional[float]): Known best value for initial design. + max_iter_override (Optional[int]): Override for maximum number of iterations. + shared_best_y (Optional[float]): Shared best value for parallel runs. + shared_lock (Optional[Lock]): Shared lock for parallel runs. - # Repeat next point if repeats_surrogate > 1 - x_next_repeated = self.update_repeats_infill_points(x_next) + Returns: + Tuple[str, OptimizeResult]: Tuple containing status and optimization result. - # Append OCBA points to new design points (if applicable) - if X_ocba is not None: - x_next_repeated = append(X_ocba, x_next_repeated, axis=0) + Raises: + ValueError: If the initial design has no valid points after removing NaN/inf values, or if the initial design is too small to proceed. - # Evaluate next point(s) including OCBA points - y_next = self.evaluate_function(x_next_repeated) + Examples: + ```{python} + import time + import numpy as np + from spotoptim import SpotOptim + from spotoptim.function import sphere + opt = SpotOptim(fun=sphere, + bounds=[(-5, 5), (-5, 5)], + n_initial=5, + max_iter=10, + seed=0, + n_jobs=1, # Use sequential optimization for deterministic output + verbose=True + ) + status, result = opt.optimize_sequential_run(timeout_start=time.time()) + print(status) + print(result.message.splitlines()[0]) + ``` + """ - # Handle NaN/inf values in new evaluations - x_next_repeated, y_next = self._handle_NA_new_points( - x_next_repeated, y_next - ) - if x_next_repeated is None: - consecutive_failures += 1 - continue # Skip iteration if all evaluations were invalid + # Store shared variable if provided + self.shared_best_y = shared_best_y + self.shared_lock = shared_lock - # Reset failure counter if we got valid points - consecutive_failures = 0 + # Initialize: Set seed, Design, Evaluate Initial Design, Init Storage & TensorBoard + X0, y0 = self._initialize_run(X0, y0_known) - # Update success rate BEFORE updating storage (so it compares against previous best) - self.update_success_rate(y_next) + # Handle NaN/inf values in initial design (remove invalid points) + X0, y0, n_evaluated = self.rm_initial_design_NA_values(X0, y0) - # Check for restart - if self.success_rate == 0.0: + # Check if we have enough valid points to continue + self.check_size_initial_design(y0, n_evaluated) + + # Initialize storage and statistics + self.init_storage(X0, y0) + self._zero_success_count = 0 + self._success_history = [] # Clear success history for new run + + # Update stats after initial design + self.update_stats() + + # Log initial design to TensorBoard + self._init_tensorboard() + + # Determine and report initial best + self.get_best_xy_initial_design() + + # Run the main sequential optimization loop + effective_max_iter = ( + max_iter_override if max_iter_override is not None else self.max_iter + ) + return self._run_sequential_loop(timeout_start, effective_max_iter) + + def _initialize_run( + self, X0: Optional[np.ndarray], y0_known: Optional[float] + ) -> Tuple[np.ndarray, np.ndarray]: + """Initialize optimization run: seed, design generation, initial evaluation. + Called from optimize_sequential_run (sequential path only). + + Args: + X0 (Optional[np.ndarray]): Initial design points in Natural Space, shape (n_initial, n_features). + y0_known (Optional[float]): Known best value for initial design. + + Returns: + Tuple[np.ndarray, np.ndarray]: Tuple containing initial design and corresponding objective values. + + Raises: + ValueError: If the initial design has no valid points after removing NaN/inf values, or if the initial design is too small to proceed. + + Examples: + >>> import numpy as np + >>> from spotoptim import SpotOptim + >>> opt = SpotOptim( + ... fun=lambda X: np.sum(X**2, axis=1), + ... bounds=[(-5, 5), (-5, 5)], + ... n_initial=5, + ... seed=0, + ... x0=np.array([0.0, 0.0]), + ... verbose=True + ... ) + >>> X0, y0 = opt._initialize_run(X0=None, y0_known=None) + >>> X0.shape + (5, 2) + >>> np.allclose(y0, np.sum(X0**2, axis=1)) + True + """ + # Set seed for reproducibility + self.set_seed() + + # Set initial design (generate or process user-provided points) + X0 = self.get_initial_design(X0) + + # Curate initial design (remove duplicates, generate additional points if needed, repeat if necessary) + X0 = self.curate_initial_design(X0) + + # Evaluate initial design + if y0_known is not None and self.x0 is not None: + # Identify injected point to skip evaluation + dists = np.linalg.norm(X0 - self.x0, axis=1) + # Use a small tolerance for matching + matches = dists < 1e-9 + + if np.any(matches): + if self.verbose: + print("Skipping re-evaluation of injected best point.") + + # Initialize y0 + y0 = np.empty(len(X0)) + y0[:] = np.nan + + # Set known values + y0[matches] = y0_known + + # Evaluate others + not_matches = ~matches + if np.any(not_matches): + y0_others = self.evaluate_function(X0[not_matches]) + y0[not_matches] = y0_others + else: + # Injected point lost during curation? Should not happen if it was unique + y0 = self.evaluate_function(X0) + else: + y0 = self.evaluate_function(X0) + + return X0, y0 + + + def update_repeats_infill_points(self, x_next: np.ndarray) -> np.ndarray: + """Repeat infill point for noisy function evaluation. Used in the sequential_loop. + For noisy objective functions (repeats_surrogate > 1), creates multiple + copies of the suggested point for repeated evaluation. Otherwise, returns + the point in 2D array format. + + Args: + x_next (ndarray): Next point to evaluate, shape (n_features,). + + Returns: + ndarray: Points to evaluate, shape (repeats_surrogate, n_features) + or (1, n_features) if repeats_surrogate == 1. + + Examples: + ```{python} + import numpy as np + from spotoptim import SpotOptim + from spotoptim.function import sphere, noisy_sphere + # Without repeats + + opt = SpotOptim( + fun=sphere, + bounds=[(-5, 5), (-5, 5)], + repeats_surrogate=1 + ) + x_next = np.array([1.0, 2.0]) + x_repeated = opt.update_repeats_infill_points(x_next) + print(x_repeated.shape) + + # With repeats for noisy function + opt_noisy = SpotOptim( + fun=noisy_sphere, + bounds=[(-5, 5), (-5, 5)], + repeats_surrogate=3 + ) + x_next = np.array([1.0, 2.0]) + x_repeated = opt_noisy.update_repeats_infill_points(x_next) + print(x_repeated.shape) + # All three copies should be identical + np.all(x_repeated[0] == x_repeated[1]) + ``` + """ + if x_next.ndim == 1: + x_next = x_next.reshape(1, -1) + + if self.repeats_surrogate > 1: + # Repeat each row repeats_surrogate times + # Note: np.repeat with axis=0 repeats rows [r1, r1, r2, r2...] + x_next_repeated = np.repeat(x_next, self.repeats_surrogate, axis=0) + else: + x_next_repeated = x_next + return x_next_repeated + + + def _run_sequential_loop( + self, timeout_start: float, effective_max_iter: int + ) -> Tuple[str, OptimizeResult]: + """Execute the main sequential optimization loop. + + Args: + timeout_start (float): Start time for timeout. + effective_max_iter (int): Maximum number of iterations for this run (may be overridden for restarts). + + Returns: + Tuple[str, OptimizeResult]: Tuple containing status and optimization result. + + Raises: + ValueError: + If excessive consecutive failures occur (e.g., due to NaN/inf values in evaluations), indicating a potential issue with the objective function. + + Examples: + >>> import time + >>> import numpy as np + >>> from spotoptim import SpotOptim + >>> opt = SpotOptim( + ... fun=lambda X: np.sum(X**2, axis=1), + ... bounds=[(-5, 5), (-5, 5)], + ... n_initial=5, + ... max_iter=10, + ... seed=0, + ... n_jobs=1, # Use sequential optimization for deterministic output + ... verbose=True + ... ) + >>> X0, y0 = opt._initialize_run(X0=None, y0_known=None) + >>> X0, y0, n_evaluated = opt.rm_initial_design_NA_values(X0, y0) + >>> opt.check_size_initial_design(y0, n_evaluated) + >>> opt.init_storage(X0, y0) + >>> opt._zero_success_count = 0 + >>> opt._success_history = [] + >>> opt.update_stats() + >>> opt.get_best_xy_initial_design() + >>> status, result = opt._run_sequential_loop(timeout_start=time.time(), effective_max_iter=10) + >>> print(status) + FINISHED + >>> print(result.message.splitlines()[0]) + Optimization terminated: maximum evaluations (10) reached + """ + consecutive_failures = 0 + + while (len(self.y_) < effective_max_iter) and ( + time.time() < timeout_start + self.max_time * 60 + ): + # Check for excessive consecutive failures (infinite loop prevention) + if consecutive_failures > self.max_iter: + msg = ( + f"Optimization stopped due to {consecutive_failures} consecutive " + "invalid evaluations (NaN/inf). Check your objective function." + ) + if self.verbose: + print(f"Warning: {msg}") + return "FINISHED", OptimizeResult( + x=self.best_x_, + fun=self.best_y_, + nfev=len(self.y_), + nit=self.n_iter_, + success=False, + message=msg, + X=self.X_, + y=self.y_, + ) + + # Increment iteration counter. This is not the same as number of function evaluations. + self.n_iter_ += 1 + + # Fit surrogate (use mean_y if noise, otherwise y_) + self._fit_scheduler() + + # Apply OCBA for noisy functions + X_ocba = self.apply_ocba() + + # Suggest next point + x_next = self.suggest_next_infill_point() + + # Repeat next point if repeats_surrogate > 1 + x_next_repeated = self.update_repeats_infill_points(x_next) + + # Append OCBA points to new design points (if applicable) + if X_ocba is not None: + x_next_repeated = append(X_ocba, x_next_repeated, axis=0) + + # Evaluate next point(s) including OCBA points + y_next = self.evaluate_function(x_next_repeated) + + # Handle NaN/inf values in new evaluations + x_next_repeated, y_next = self._handle_NA_new_points( + x_next_repeated, y_next + ) + if x_next_repeated is None: + consecutive_failures += 1 + continue # Skip iteration if all evaluations were invalid + + # Reset failure counter if we got valid points + consecutive_failures = 0 + + # Update success rate BEFORE updating storage (so it compares against previous best) + self.update_success_rate(y_next) + + # Check for restart + if self.success_rate == 0.0: self._zero_success_count += 1 else: self._zero_success_count = 0 @@ -4705,7 +5322,13 @@ def _run_sequential_loop( y=self.y_, ) - def _update_storage_steady(self, x, y): + # ==================== + # TASK_OPTIM_PARALLEL: + # _update_storage_steady() + # optimize_steady_state() + # ==================== + + def _update_storage_steady(self, x, y): """Helper to safely append single point (for steady state). This method is designed for the steady-state parallel optimization scenario, where new points are evaluated and returned asynchronously. It safely appends new points to the existing storage of evaluated points and their function values, @@ -5136,648 +5759,122 @@ def _batch_ready() -> bool: * 100 ) progress_str = f"Time: {prog_val:.1f}%" - else: - prog_val = ( - len(self.y_) / effective_max_iter * 100 - ) - progress_str = f"Evals: {prog_val:.1f}%" - - print( - f"Iter {len(self.y_)}/{effective_max_iter}" - f" | Best: {self.best_y_:.6f}" - f" | Rate: {self.success_rate:.2f}" - f" | {progress_str}" - ) - - # Single surrogate refit for the whole batch — - # held under the lock so in-flight search threads - # do not read a partially-updated model. - with _surrogate_lock: - self._fit_scheduler() - - except Exception as e: - _future_n_pts.pop(fut, None) - if self.verbose: - print(f"Error processing future: {e}") - - return "FINISHED", OptimizeResult( - x=self.best_x_, - fun=self.best_y_, - nfev=len(self.y_), - nit=self.n_iter_, - success=True, - message="Optimization finished (Steady State)", - X=self.X_, - y=self.y_, - ) - - def suggest_next_infill_point(self) -> np.ndarray: - """Suggest next point to evaluate (dispatcher). - The returned point is in the Transformed and Mapped Space (Internal Optimization Space). - This means: - 1. Transformations (e.g., log, sqrt) have been applied. - 2. Dimension reduction has been applied (fixed variables removed). - Process: - 1. Try candidates from acquisition function optimizer. - 2. Handle acquisition failure (fallback). - 3. Return last attempt if all fails. - - - Returns: - ndarray: Next point(s) to evaluate in Transformed and Mapped Space. - Shape is (n_infill_points, n_features). - - Examples: - ```{python} - import numpy as np - from spotoptim import SpotOptim - def sphere(X): - X = np.atleast_2d(X) - return np.sum(X**2, axis=1) - opt = SpotOptim( - fun=sphere, - bounds=[(-5, 5), (-5, 5)], - n_initial=5, - n_infill_points=2 - ) - # Need to initialize optimization state (X_, y_, surrogate) - # Normally done inside optimize() - np.random.seed(0) - opt.X_ = np.random.rand(10, 2) - opt.y_ = np.random.rand(10) - opt._fit_surrogate(opt.X_, opt.y_) - x_next = opt.suggest_next_infill_point() - x_next.shape - ``` - """ - # 1. Optimizer candidates - candidates = [] - opt_candidates = self._try_optimizer_candidates( - n_needed=self.n_infill_points, current_batch=candidates - ) - candidates.extend(opt_candidates) - - if len(candidates) >= self.n_infill_points: - return np.vstack(candidates) - - # 2. Try fallback strategy to fill remaining slots - while len(candidates) < self.n_infill_points: - # Just try one attempt at a time but loop - # We pass current batch to avoid dups - cand, x_last = self._try_fallback_strategy( - max_attempts=10, current_batch=candidates - ) - if cand is not None: - candidates.append(cand) - else: - # Fallback failed to find unique point even after retries - # Break and fill with last attempts or just return what we have? - # If we return partial batch, we might fail downstream if code expects n points? - # Actually code should handle any number of points returned by this method? - # Or duplicate valid points? - # Warn and use duplicate if absolutely necessary? - if self.verbose: - print( - "Warning: Could not fill all infill points with unique candidates." - ) - break - - if len(candidates) > 0: - return np.vstack(candidates) - - # 3. Return last attempt (duplicate) if absolutely nothing found - # This returns a single point (1, d). - # Should we return n copies? - # If n_infill_points > 1, we should probably output (n, d) - - if self.verbose: - print( - "Warning: Could not find unique point after optimization candidates and fallback attempts. " - "Returning last candidate (duplicate)." - ) - - # Verify x_last is not None - if x_last is None: - # Should practically not happen - x_next = self._handle_acquisition_failure() - return x_next.reshape(1, -1) - - # Return duplicated x_last to fill n_infill_points? OR just 1? - # Let's return 1 and let loop repeat it? - # But loop repeats based on x_next logic. - # If we return 1 point, it is treated as 1 point. - # If user asked for n_infill_points, maybe we should just return what we have (1 duplicated). - - return x_last.reshape(1, -1) - - def _handle_NA_new_points( - self, x_next: np.ndarray, y_next: np.ndarray - ) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]: - """Handle NaN/inf values in new evaluation points. - Applies penalties to NaN/inf values and removes any remaining invalid points. - If all evaluations are invalid, returns None for both arrays to signal that - the iteration should be skipped. - - Args: - x_next (ndarray): Design points that were evaluated, shape (n_eval, n_features). - y_next (ndarray): Function values at x_next, shape (n_eval,). - - Returns: - Tuple[Optional[ndarray], Optional[ndarray]]: Tuple of (x_clean, y_clean). - Both are None if all evaluations were NaN/inf (iteration should be skipped). - Otherwise returns filtered arrays with only finite values. - - Examples: - >>> import numpy as np - >>> from spotoptim import SpotOptim - >>> opt = SpotOptim( - ... fun=lambda X: np.sum(X**2, axis=1), - ... bounds=[(-5, 5), (-5, 5)], - ... n_initial=5, - ... verbose=True - ... ) - >>> # Simulate optimization state - >>> opt.y_ = np.array([1.0, 2.0, 3.0]) # Historical values - >>> opt.n_iter_ = 1 - >>> - >>> # Case 1: Some valid values - >>> x_next = np.array([[1, 2], [3, 4], [5, 6]]) - >>> y_next = np.array([5.0, np.nan, 10.0]) - >>> x_clean, y_clean = opt._handle_NA_new_points(x_next, y_next) - >>> x_clean.shape - (2, 2) - >>> y_clean.shape - (2,) - >>> - >>> # Case 2: All NaN/inf - should skip iteration - >>> x_all_bad = np.array([[1, 2], [3, 4]]) - >>> y_all_bad = np.array([np.nan, np.inf]) - >>> x_clean, y_clean = opt._handle_NA_new_points(x_all_bad, y_all_bad) - Warning: All new evaluations were NaN/inf, skipping iteration 1 - >>> x_clean is None - True - >>> y_clean is None - True - """ - # Handle NaN/inf values in new evaluations - # Use historical y values (self.y_) for computing penalty statistics - if self.penalty: - y_next = self.apply_penalty_NA(y_next, y_history=self.y_) - - # Identify which points are valid (finite) BEFORE removing them - # Note: remove_nan filters based on y_next finite values - - # Ensure y_next is a float array (maps non-convertible values like "error" or None to nan) - # This is critical if the objective function returns non-numeric values and penalty=False - if y_next.dtype == object: - # Use safe float conversion similar to apply_penalty_NA - def _safe_float(v): - try: - return float(v) - except (ValueError, TypeError): - return np.nan - - # Reconstruct as float array - y_flat = np.array(y_next).flatten() - y_next = np.array([_safe_float(v) for v in y_flat]) - - finite_mask = np.isfinite(y_next) - - X_next_clean, y_next_clean = self.remove_nan( - x_next, y_next, stop_on_zero_return=False - ) - - # If we have multi-objective values, we need to filter them too - # The new MO values were appended to self.y_mo in evaluate_function -> mo2so -> store_mo - # So self.y_mo currently contains the INVALID points at the end. - if self.y_mo is not None: - n_new = len(y_next) - # Check if y_mo has the new points appended - if len(self.y_mo) >= n_new: - # The new points are at the end of y_mo - y_mo_new = self.y_mo[-n_new:] - y_mo_old = self.y_mo[:-n_new] - - # Filter the new MO points using the mask from y_next - y_mo_new_clean = y_mo_new[finite_mask] - - # Reconstruct y_mo - if len(y_mo_old) > 0: - self.y_mo = ( - np.vstack([y_mo_old, y_mo_new_clean]) - if len(y_mo_new_clean) > 0 - else y_mo_old - ) - else: - self.y_mo = y_mo_new_clean - else: - if self.verbose: - print( - "Warning: y_mo size inconsistent with new points in _handle_NA_new_points" - ) - - # Skip this iteration if all new points were NaN/inf - if len(y_next_clean) == 0: - if self.verbose: - print( - f"Warning: All new evaluations were NaN/inf, skipping iteration {self.n_iter_}" - ) - return None, None - - return X_next_clean, y_next_clean - - def _update_best_main_loop( - self, - x_next_repeated: np.ndarray, - y_next: np.ndarray, - start_time: Optional[float] = None, - ) -> None: - """Update best solution found during main optimization loop. - Checks if any new evaluations improve upon the current best solution. - If improvement is found, updates best_x_ and best_y_ attributes and - prints progress if verbose mode is enabled. - - Args: - x_next_repeated (ndarray): Design points that were evaluated in transformed space, - shape (n_eval, n_features). - y_next (ndarray): Function values at x_next_repeated, shape (n_eval,). - - Examples: - >>> import numpy as np - >>> from spotoptim import SpotOptim - >>> opt = SpotOptim( - ... fun=lambda X: np.sum(X**2, axis=1), - ... bounds=[(-5, 5), (-5, 5)], - ... n_initial=5, - ... verbose=True - ... ) - >>> # Simulate optimization state - >>> opt.n_iter_ = 1 - >>> opt.best_x_ = np.array([1.0, 1.0]) - >>> opt.best_y_ = 2.0 - >>> - >>> # Case 1: New best found - >>> x_new = np.array([[0.1, 0.1], [0.5, 0.5]]) - >>> y_new = np.array([0.02, 0.5]) - >>> opt._update_best_main_loop(x_new, y_new) - Iteration 1: New best f(x) = 0.020000 - >>> opt.best_y_ - 0.02 - >>> - >>> # Case 2: No improvement - >>> opt.n_iter_ = 2 - >>> x_no_improve = np.array([[1.5, 1.5]]) - >>> y_no_improve = np.array([4.5]) - >>> opt._update_best_main_loop(x_no_improve, y_no_improve) - Iteration 2: f(x) = 4.500000 - >>> - >>> # Case 3: With noisy function - >>> opt_noise = SpotOptim( - ... fun=lambda X: np.sum(X**2, axis=1), - ... bounds=[(-5, 5), (-5, 5)], - ... repeats_surrogate=2, - ... verbose=True - ... ) - >>> opt_noise.n_iter_ = 1 - >>> opt_noise.best_y_ = 2.0 - >>> opt_noise.min_mean_y = 1.5 - >>> y_noise = np.array([0.5]) - >>> x_noise = np.array([[0.5, 0.5]]) - >>> opt_noise._update_best_main_loop(x_noise, y_noise) - Iteration 1: New best f(x) = 0.500000, mean best: f(x) = 1.500000 - """ - # Update best - # Determine global best value for printing if shared variable exists - global_best_val = None - if hasattr(self, "shared_best_y") and self.shared_best_y is not None: - # Sync with global shared value - lock_obj = getattr(self, "shared_lock", None) - if lock_obj is not None: - with lock_obj: - if ( - self.best_y_ is not None - and self.best_y_ < self.shared_best_y.value - ): - self.shared_best_y.value = self.best_y_ - - min_y_next = np.min(y_next) - if min_y_next < self.shared_best_y.value: - self.shared_best_y.value = min_y_next - - global_best_val = self.shared_best_y.value - - current_best = np.min(y_next) - if current_best < self.best_y_: - best_idx_in_new = np.argmin(y_next) - # x_next_repeated is in transformed space, convert to original for storage - self.best_x_ = self.inverse_transform_X( - x_next_repeated[best_idx_in_new].reshape(1, -1) - )[0] - self.best_y_ = current_best - - if self.verbose: - # Calculate progress - if self.max_time != np.inf and start_time is not None: - progress = (time.time() - start_time) / (self.max_time * 60) * 100 - progress_str = f"Time: {progress:.1f}%" - else: - prev_evals = sum(res.nfev for res in self.restarts_results_) - progress = (prev_evals + self.counter) / self.max_iter * 100 - progress_str = f"Evals: {progress:.1f}%" - - msg = f"Iter {self.n_iter_}" - if global_best_val is not None: - msg += f" | GlobalBest: {global_best_val:.6f}" - msg += f" | Best: {self.best_y_:.6f} | Rate: {self.success_rate:.2f} | {progress_str}" - - if (self.repeats_initial > 1) or (self.repeats_surrogate > 1): - msg += f" | Mean Best: {self.min_mean_y:.6f}" - - print(msg) - elif self.verbose: - if self.max_time != np.inf and start_time is not None: - progress = (time.time() - start_time) / (self.max_time * 60) * 100 - progress_str = f"Time: {progress:.1f}%" - else: - prev_evals = sum(res.nfev for res in self.restarts_results_) - progress = (prev_evals + self.counter) / self.max_iter * 100 - progress_str = f"Evals: {progress:.1f}%" - - current_val = np.min(y_next) - msg = f"Iter {self.n_iter_}" - if global_best_val is not None: - msg += f" | GlobalBest: {global_best_val:.6f}" - msg += f" | Best: {self.best_y_:.6f} | Curr: {current_val:.6f} | Rate: {self.success_rate:.2f} | {progress_str}" - - if (self.repeats_initial > 1) or (self.repeats_surrogate > 1): - mean_y_new = np.mean(y_next) - msg += f" | Mean Curr: {mean_y_new:.6f}" - print(msg) - - def determine_termination(self, timeout_start: float) -> str: - """Determine termination reason for optimization. - Checks the termination conditions and returns an appropriate message - indicating why the optimization stopped. Three possible termination - conditions are checked in order of priority: - 1. Maximum number of evaluations reached - 2. Maximum time limit exceeded - 3. Successful completion (neither limit reached) - - Args: - timeout_start (float): Start time of optimization (from time.time()). - - Returns: - str: Message describing the termination reason. - - Examples: - ```{python} - import numpy as np - import time - from spotoptim import SpotOptim - opt = SpotOptim( - fun=lambda X: np.sum(X**2, axis=1), - bounds=[(-5, 5), (-5, 5)], - max_iter=10, - max_time=10.0 - ) - # Case 1: Maximum evaluations reached - opt.y_ = np.zeros(20) # Simulate 20 evaluations - start_time = time.time() - msg = opt.determine_termination(start_time) - print(msg) - ``` - ```{python} - # Case 2: Time limit exceeded - import numpy as np - import time - from spotoptim import SpotOptim - opt.y_ = np.zeros(10) # Only 10 evaluations - start_time = time.time() - 700 # Simulate 11.67 minutes elapsed - msg = opt.determine_termination(start_time) - print(msg) - ``` - ```{python} - # Case 3: Successful completion - import numpy as np - import time - from spotoptim import SpotOptim - opt.y_ = np.zeros(10) # Under max_iter - start_time = time.time() # Just started - msg = opt.determine_termination(start_time) - print(msg) - ``` - """ - # Determine termination reason - elapsed_time = time.time() - timeout_start - if len(self.y_) >= self.max_iter: - message = f"Optimization terminated: maximum evaluations ({self.max_iter}) reached" - elif elapsed_time >= self.max_time * 60: - message = ( - f"Optimization terminated: time limit ({self.max_time:.2f} min) reached" - ) - else: - message = "Optimization finished successfully" + else: + prog_val = ( + len(self.y_) / effective_max_iter * 100 + ) + progress_str = f"Evals: {prog_val:.1f}%" - return message + print( + f"Iter {len(self.y_)}/{effective_max_iter}" + f" | Best: {self.best_y_:.6f}" + f" | Rate: {self.success_rate:.2f}" + f" | {progress_str}" + ) - def apply_ocba(self) -> Optional[np.ndarray]: - """Apply Optimal Computing Budget Allocation for noisy functions. - Determines which existing design points should be re-evaluated based on - OCBA algorithm. This method computes optimal budget allocation to improve - the quality of the estimated best design. + # Single surrogate refit for the whole batch — + # held under the lock so in-flight search threads + # do not read a partially-updated model. + with _surrogate_lock: + self._fit_scheduler() - Returns: - Optional[ndarray]: - Array of design points to re-evaluate, shape (n_re_eval, n_features). - Returns None if OCBA conditions are not met or OCBA is disabled. + except Exception as e: + _future_n_pts.pop(fut, None) + if self.verbose: + print(f"Error processing future: {e}") - Note: - OCBA is only applied when: + return "FINISHED", OptimizeResult( + x=self.best_x_, + fun=self.best_y_, + nfev=len(self.y_), + nit=self.n_iter_, + success=True, + message="Optimization finished (Steady State)", + X=self.X_, + y=self.y_, + ) - * (self.repeats_initial > 1) or (self.repeats_surrogate > 1) - * self.ocba_delta > 0 - * All variances are > 0 - * At least 3 design points exist - Examples: - ```{python} - import numpy as np - from spotoptim import SpotOptim - opt = SpotOptim( - fun=lambda X: np.sum(X**2, axis=1) + np.random.normal(0, 0.1, X.shape[0]), - bounds=[(-5, 5), (-5, 5)], - n_initial=5, - repeats_surrogate=2, - ocba_delta=5, - verbose=True - ) - # Simulate optimization state (normally done in optimize()) - opt.mean_X = np.array([[1, 2], [0, 0], [2, 1]]) - opt.mean_y = np.array([5.0, 0.1, 5.0]) - opt.var_y = np.array([0.1, 0.05, 0.15]) - X_ocba = opt.apply_ocba() - # OCBA: Adding 5 re-evaluation(s). - # The following should be true: - print(X_ocba.shape[0] == 5) - ``` + # ==================== + # TASK_STATS: + # init_storage() + # update_storage() + # update_stats() + # update_success_rate() + # get_success_rate() + # aggregate_mean_var() + # get_best_hyperparameters + # ==================== - OCBA skipped - insufficient points - ```{python} - import numpy as np - from spotoptim import SpotOptim - opt2 = SpotOptim( - fun=lambda X: np.sum(X**2, axis=1), - bounds=[(-5, 5), (-5, 5)], - repeats_surrogate=2, - ocba_delta=5, - verbose=True - ) - opt2.mean_X = np.array([[1, 2], [0, 0]]) - opt2.mean_y = np.array([5.0, 0.1]) - opt2.var_y = np.array([0.1, 0.05]) - X_ocba = opt2.apply_ocba() - # Warning: OCBA skipped (need >2 points with variance > 0) - print(X_ocba is None) - ``` + def get_best_hyperparameters( + self, as_dict: bool = True + ) -> Union[Dict[str, Any], np.ndarray, None]: """ - # OCBA: Compute optimal budget allocation for noisy functions - # This determines which existing design points should be re-evaluated - X_ocba = None - if ( - self.repeats_initial > 1 or self.repeats_surrogate > 1 - ) and self.ocba_delta > 0: - # Check conditions for OCBA (need variance > 0 and at least 3 points) - if not np.all(self.var_y > 0) and (self.mean_X.shape[0] <= 2): - if self.verbose: - print("Warning: OCBA skipped (need >2 points with variance > 0)") - elif np.all(self.var_y > 0) and (self.mean_X.shape[0] > 2): - # Get OCBA allocation - X_ocba = self.get_ocba_X( - self.mean_X, - self.mean_y, - self.var_y, - self.ocba_delta, - verbose=self.verbose, - ) - if self.verbose and X_ocba is not None: - print(f" OCBA: Adding {X_ocba.shape[0]} re-evaluation(s)") - - return X_ocba - - def apply_penalty_NA( - self, - y: np.ndarray, - y_history: Optional[np.ndarray] = None, - penalty_value: Optional[float] = None, - sd: float = 0.1, - ) -> np.ndarray: - """Replace NaN and infinite values with penalty plus random noise. - Used in the optimize() method after function evaluations. - This method follows the approach from spotpython.utils.repair.apply_penalty_NA, - replacing NaN/inf values with a penalty value plus random noise to avoid - identical penalty values. + Get the best hyperparameter configuration found during optimization. + If noise handling is active (repeats_initial > 1 or OCBA), this returns the parameter + configuration associated with the best *mean* objective value. Otherwise, it returns + the configuration associated with the absolute best observed value. Args: - y (ndarray): Array of objective function values to be repaired. - y_history (ndarray, optional): Historical objective function values used for - computing penalty statistics. If None, uses y itself. Default is None. - penalty_value (float, optional): Value to replace NaN/inf with. - If None, computes penalty as: max(finite_y_history) + 3 * std(finite_y_history). - If all values are NaN/inf or only one finite value exists, falls back - to self.penalty_val. Default is None. - sd (float): Standard deviation for normal distributed random noise added to penalty. - Default is 0.1. + as_dict (bool, optional): If True, returns a dictionary mapping parameter names + to their values. If False, returns the raw numpy array. Defaults to True. Returns: - ndarray: - Array with NaN/inf replaced by penalty_value + random noise - (normal distributed with mean 0 and standard deviation sd). + Union[Dict[str, Any], np.ndarray, None]: The best hyperparameter configuration. + Returns None if optimization hasn't started (no data). Examples: ```{python} - import numpy as np from spotoptim import SpotOptim - opt = SpotOptim(fun=lambda X: np.sum(X**2, axis=1), bounds=[(-5, 5)]) - y_hist = np.array([1.0, 2.0, 3.0, 5.0]) - y_new = np.array([4.0, np.nan, np.inf]) - y_clean = opt.apply_penalty_NA(y_new, y_history=y_hist) - print(f"np.all(np.isfinite(y_clean)): {np.all(np.isfinite(y_clean))}") - print(f"y_clean: {y_clean}") - # NaN/inf replaced with worst value from history + 3*std + noise - print(f"y_clean[1] > 5.0: {y_clean[1] > 5.0}") # Should be larger than max finite value in history + from spotoptim.function import sphere + + opt = SpotOptim(fun=sphere, + bounds=[(-5, 5), (0, 10)], + n_initial=5, + var_name=["x", "y"], + verbose=True) + opt.optimize() + best_params = opt.get_best_hyperparameters() + print(best_params['x']) # Should be close to 0 ``` """ + if self.X_ is None or len(self.X_) == 0: + return None - # Ensure y is a float array (maps non-convertible values like "error" or None to nan) - def _safe_float(v): - try: - return float(v) - except (ValueError, TypeError): - return np.nan - - y_flat = np.array(y).flatten() - y = np.array([_safe_float(v) for v in y_flat]) - # Identify NaN and inf values in y - mask = ~np.isfinite(y) - - if np.any(mask): - n_bad = np.sum(mask) + # Determine which "best" to use + if (self.repeats_initial > 1 or self.repeats_surrogate > 1) and hasattr( + self, "min_mean_X" + ): + best_x = self.min_mean_X + else: + best_x = self.best_x_ - # Compute penalty_value if not provided - if penalty_value is None: - # Get finite values from history for statistics - # Use y_history if provided, otherwise fall back to y itself - if y_history is not None: - finite_values = y_history[np.isfinite(y_history)] - else: - # Use current y values - finite_values = y[~mask] + if not as_dict: + return best_x - # If we have at least 2 finite values, compute adaptive penalty - if len(finite_values) >= 2: - max_y = np.max(finite_values) - std_y = np.std(finite_values, ddof=1) - penalty_value = max_y + 3.0 * std_y + # Map factors using existing method (handles 2D, returns 2D) + # We pass best_x as (1, D) and get (1, D) back + mapped_x = self.map_to_factor_values(best_x.reshape(1, -1))[0] - if self.verbose: - print( - f"Warning: Found {n_bad} NaN/inf value(s), replacing with " - f"adaptive penalty (max + 3*std = {penalty_value:.4f})" - ) - else: - # Fallback to self.penalty if insufficient finite values - if self.penalty_val is not None: - penalty_value = self.penalty_val - elif len(finite_values) == 1: - # Use the single finite value + a large constant - penalty_value = finite_values[0] + 1000.0 - else: - # All values are NaN/inf, use a large default - penalty_value = 1e10 + # Convert to dictionary with types + params = {} + names = ( + self.var_name if self.var_name else [f"p{i}" for i in range(len(best_x))] + ) - if self.verbose: - print( - f"Warning: Found {n_bad} NaN/inf value(s), insufficient finite values " - f"for adaptive penalty. Using penalty_value = {penalty_value}" - ) - else: - if self.verbose: - print( - f"Warning: Found {n_bad} NaN/inf value(s), replacing with {penalty_value} + noise" - ) + for i, name in enumerate(names): + val = mapped_x[i] - # Generate random noise and add to penalty - random_noise = self.rng.normal(0, sd, y.shape) - penalty_values = penalty_value + random_noise + # Handle types if available (specifically int, as factors are already mapped) + if self.var_type: + v_type = self.var_type[i] + if v_type == "int": + val = int(round(val)) - # Replace NaN/inf with penalty + noise - y[mask] = penalty_values[mask] + params[name] = val - return y + return params - # ==================== - # Storage & Statistics - # ==================== def init_storage(self, X0: np.ndarray, y0: np.ndarray) -> None: """Initialize storage for optimization. @@ -6016,7 +6113,6 @@ def aggregate_mean_var( self, X: np.ndarray, y: np.ndarray ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Aggregate X and y values to compute mean and variance per group. - For repeated evaluations at the same design point, this method computes the mean function value and variance (using population variance, ddof=0). @@ -6077,7 +6173,21 @@ def aggregate_mean_var( return X_agg, y_mean, y_var # ==================== - # Results & Analysis + # TASK_RESULTS: + # save_result() + # load_result() + # save_experiment() + # load_experiment() + # get_result_filename() + # get_experiment_filename() + # print_results() + # print_best() + # get_results_table() + # get_design_table() + # gen_design_table() + # get_importance() + # sensitivity_spearman() + # get_stars() # ==================== def save_result( @@ -6163,7 +6273,6 @@ def sphere(X): @staticmethod def load_result(filename: str) -> "SpotOptim": """Load complete optimization results from a pickle file (suffix '_res.pkl') - Loads results that were saved with save_result(). The loaded optimizer will have both configuration and all optimization results. @@ -7113,7 +7222,14 @@ def test_func(X): return output_list # ==================== - # TensorBoard Integration + # TASK_TENSORBOARD: + # _clen_tensorboard_logs() + # _init_tensorboard_writer() + # _write_tensorboard_scalars() + # _write_tensorboard_hparams() + # _close_tensorboard_writer() + # init_tensorboard() + # _close_and_del_tensorboard_writer() # ==================== def _clean_tensorboard_logs(self) -> None: @@ -7348,7 +7464,13 @@ def _close_and_del_tensorboard_writer(self) -> None: self.tb_writer = None # ==================== - # Plotting + # TASK_PLOT: + # plot_progress() + # plot_surrogate() + # plot_important_hyperparameter_contour() + # _plot_surrogate_with_factors() + # plot_importance() + # plot_parameter_scatter() # ==================== def plot_progress( diff --git a/tests/test_optimize_refactored_methods.py b/tests/test_optimize_refactored_methods.py index 60d1850..fe6b7c0 100644 --- a/tests/test_optimize_refactored_methods.py +++ b/tests/test_optimize_refactored_methods.py @@ -156,7 +156,7 @@ def test_handle_na_removes_nan_values(self): X0 = np.array([[1, 2], [3, 4], [5, 6]]) y0 = np.array([5.0, np.nan, 61.0]) - X0_clean, y0_clean, n_eval = opt.rm_NA_values(X0, y0) + X0_clean, y0_clean, n_eval = opt.rm_initial_design_NA_values(X0, y0) assert X0_clean.shape == (2, 2) assert len(y0_clean) == 2 @@ -174,7 +174,7 @@ def test_handle_na_removes_inf_values(self): X0 = np.array([[1, 2], [3, 4], [5, 6]]) y0 = np.array([5.0, np.inf, 61.0]) - X0_clean, y0_clean, n_eval = opt.rm_NA_values(X0, y0) + X0_clean, y0_clean, n_eval = opt.rm_initial_design_NA_values(X0, y0) assert X0_clean.shape == (2, 2) assert len(y0_clean) == 2 @@ -191,7 +191,7 @@ def test_handle_na_all_valid_values(self): X0 = np.array([[1, 2], [3, 4], [5, 6]]) y0 = np.array([5.0, 25.0, 61.0]) - X0_clean, y0_clean, n_eval = opt.rm_NA_values(X0, y0) + X0_clean, y0_clean, n_eval = opt.rm_initial_design_NA_values(X0, y0) assert X0_clean.shape == (3, 2) assert len(y0_clean) == 3 diff --git a/tests/test_run_sequential_loop_example.py b/tests/test_run_sequential_loop_example.py index f1ea90f..4cfb6b4 100644 --- a/tests/test_run_sequential_loop_example.py +++ b/tests/test_run_sequential_loop_example.py @@ -21,7 +21,7 @@ def test_run_sequential_loop_example(): ) X0, y0 = opt._initialize_run(X0=None, y0_known=None) - X0, y0, n_evaluated = opt.rm_NA_values(X0, y0) + X0, y0, n_evaluated = opt.rm_initial_design_NA_values(X0, y0) opt.check_size_initial_design(y0, n_evaluated) opt.init_storage(X0, y0) opt._zero_success_count = 0