diff --git a/.github/workflows/release-preflight.yml b/.github/workflows/release-preflight.yml index 988df5f..a80e0c0 100644 --- a/.github/workflows/release-preflight.yml +++ b/.github/workflows/release-preflight.yml @@ -55,15 +55,21 @@ jobs: exit 1 fi - - name: Configure authenticated Git remote + - name: Configure Git credentials env: RELEASE_TOKEN: ${{ secrets.SEMANTIC_RELEASE_TOKEN }} run: | - git remote set-url origin https://x-access-token:${RELEASE_TOKEN}@github.com/${{ github.repository }}.git + set -euo pipefail + # Disable any credential helpers the runner may have configured + # (gh CLI, credential-manager, etc.) that can intercept push auth. + git config --local --unset-all credential.helper || true + git config --local credential.helper '' + # Use the extraheader method — the same mechanism actions/checkout + # uses internally. Works reliably for both fetch and push. + BASIC=$(echo -n "x-access-token:${RELEASE_TOKEN}" | base64) + git config --local http.https://github.com/.extraheader "AUTHORIZATION: basic ${BASIC}" - name: Preflight token and push checks - env: - RELEASE_TOKEN: ${{ secrets.SEMANTIC_RELEASE_TOKEN }} run: | set -euo pipefail @@ -73,9 +79,9 @@ jobs: echo "Checking write access via dry-run push to unprotected ref..." test_ref="refs/preflight/sr-${GITHUB_RUN_ID}-${GITHUB_RUN_ATTEMPT}" if ! git push --dry-run origin "HEAD:${test_ref}" 2>&1; then - echo "::error::SEMANTIC_RELEASE_TOKEN exists but cannot push to ${GITHUB_REPOSITORY}." + echo "::error::SEMANTIC_RELEASE_TOKEN cannot push to ${GITHUB_REPOSITORY}." echo "::error::Ensure the PAT has Contents: Read and write (fine-grained) or 'repo' scope (classic)." - echo "::error::For org repos, also verify the fine-grained PAT was approved in org settings." + echo "::error::For org repos, verify the fine-grained PAT was approved in org settings." exit 1 fi diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index ac0df6c..b3e22d5 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -103,11 +103,15 @@ jobs: @semantic-release/exec@6 \ conventional-changelog-conventionalcommits@7 - - name: Configure authenticated Git remote + - name: Configure Git credentials env: RELEASE_TOKEN: ${{ secrets.SEMANTIC_RELEASE_TOKEN || github.token }} run: | - git remote set-url origin https://x-access-token:${RELEASE_TOKEN}@github.com/${{ github.repository }}.git + set -euo pipefail + git config --local --unset-all credential.helper || true + git config --local credential.helper '' + BASIC=$(echo -n "x-access-token:${RELEASE_TOKEN}" | base64) + git config --local http.https://github.com/.extraheader "AUTHORIZATION: basic ${BASIC}" - name: Semantic Release env: diff --git a/docs/spotoptim_class.qmd b/docs/spotoptim_class.qmd new file mode 100644 index 0000000..24d3ec3 --- /dev/null +++ b/docs/spotoptim_class.qmd @@ -0,0 +1,137 @@ +--- +title: "SpotOptim Class" +description: "Structure of the Methods" +--- + + +## Overview + +# TASK_VARS: + +* detect_var_type +* modify_bounds_based_on_var_type +* repair_non_numeric +* handle_default_var_trans +* process_factor_bounds + + +# TASK_SAVE_LOAD: + +* get_pickle_safe_optimizer +* reinitialize_components + +# TASK_DIM: + +* setup_dimension_reduction() +* to_red_dim() +* to_all_dim() + +# TASK_TRANSFORM: + +* transform_value() +* inverse_transform_value() +* transform_X() +* inverse_transform_X() +* transform_bounds() +* map_to_factor_values() + +# TASK_INIT_DESIGN: + +* get_initial_design() +* generate_initial_design() +* curate_initial_design() +* rm_initial_design_NA_values() +* validate_x0() +* check_size_initial_design() +* get_best_xy_initial_design() + +# TASK_Surrogate: + +* init_surrogate() +* _fit_surrogate() +*_fit_scheduler() + +# TASK_PREDICT: + +* _predict_with_uncertainty() +* _acquisition_function() + +# TASK_OPTIM: + +* evaluate_function() +* _optimize_acquisition_tricands() +* _prepare_de_kwargs() +* _optimize_acquisition_de() +* _optimize_acquisition_scipy() +* _try_optimizer_candidates() +* remove_nan() +* _handle_acquisition_failure() +* _try_fallback_strategy() +* get_shape() +* optimize_acquisition_func() +* _optimize_run_task() +* optimize() +* execute_optimization_run() + +# TASK_MO: + +* store_mo() +* mo2so() + +# TASK_OCBA: + +* apply_ocba() +* get_ranks() +* get_ocba() +* get_ocba_X() + +# TASK_SELECT: + +* select_distant_points() +* select_best_cluster() +* _selection_dispatcher() +* select_new() +* suggest_next_infill_point() + +# TASK_OPTIM_SEQ: + +* determine_termination() +* apply_penalty_NA() +* update_best_main_loop() +* handle_NA_new_points() +* optimize_sequential_run() +* _initialize_run() +* update_repeats_infill_points() +* _run_sequential_loop() + +# TASK_OPTIM_PARALLEL: + +* _update_storage_steady() +* optimize_steady_state() + +# TASK_STATS: + +* init_storage() +* update_storage() +* update_stats() +* update_success_rate() +* get_success_rate() +* aggregate_mean_var() +* get_best_hyperparameters + +# TASK_RESULTS: + +* save_result() +* load_result() +* save_experiment() +* load_experiment() +* get_result_filename() +* get_experiment_filename() +* print_results() +* print_best() +* get_results_table() +* get_design_table() +* gen_design_table() +* get_importance() +* sensitivity_spearman() +* get_stars() \ No newline at end of file diff --git a/src/spotoptim/SpotOptim.py b/src/spotoptim/SpotOptim.py index 48d9e7f..4422769 100644 --- a/src/spotoptim/SpotOptim.py +++ b/src/spotoptim/SpotOptim.py @@ -1036,11 +1036,11 @@ def set_seed(self) -> None: # ==================== # TASK_VARS: - # detect_var_type - # modify_bounds_based_on_var_type - # repair_non_numeric - # handle_default_var_trans - # process_factor_bounds + # * detect_var_type + # * modify_bounds_based_on_var_type + # * repair_non_numeric + # * handle_default_var_trans + # * process_factor_bounds # ==================== def detect_var_type(self) -> list: @@ -1267,8 +1267,8 @@ def process_factor_bounds(self) -> None: # ==================== # TASK_SAVE_LOAD: - # get_pickle_safe_optimizer - # reinitialize_components + # * get_pickle_safe_optimizer + # * reinitialize_components # ==================== def get_pickle_safe_optimizer( @@ -1379,9 +1379,9 @@ def reinitialize_components(self) -> None: # ==================== # TASK_DIM: - # setup_dimension_reduction() - # to_red_dim() - # to_all_dim() + # * setup_dimension_reduction() + # * to_red_dim() + # * to_all_dim() # ==================== def setup_dimension_reduction(self) -> None: @@ -1570,12 +1570,12 @@ def sphere(X): # ==================== # TASK_TRANSFORM: - # transform_value() - # inverse_transform_value() - # transform_X() - # inverse_transform_X() - # transform_bounds() - # map_to_factor_values() + # * transform_value() + # * inverse_transform_value() + # * transform_X() + # * inverse_transform_X() + # * transform_bounds() + # * map_to_factor_values() # ==================== def transform_value(self, x: float, trans: Optional[str]) -> float: @@ -1909,13 +1909,13 @@ def map_to_factor_values(self, X: np.ndarray) -> np.ndarray: # ==================== # TASK_INIT_DESIGN: - # get_initial_design() - # generate_initial_design() - # curate_initial_design() - # rm_initial_design_NA_values() - # validate_x0() - # check_size_initial_design() - # get_best_xy_initial_design() + # * get_initial_design() + # * generate_initial_design() + # * curate_initial_design() + # * rm_initial_design_NA_values() + # * validate_x0() + # * check_size_initial_design() + # * get_best_xy_initial_design() # ==================== def get_initial_design(self, X0: Optional[np.ndarray] = None) -> np.ndarray: @@ -2470,9 +2470,9 @@ def get_best_xy_initial_design(self) -> None: # ==================== # TASK_Surrogate: - # init_surrogate() - # _fit_surrogate() - # _fit_scheduler() + # * init_surrogate() + # * _fit_surrogate() + # *_fit_scheduler() # ==================== def init_surrogate(self) -> None: @@ -2730,8 +2730,8 @@ def _fit_scheduler(self) -> None: # ==================== # TASK_PREDICT: - # _predict_with_uncertainty() - # _acquisition_function() + # * _predict_with_uncertainty() + # * _acquisition_function() # ==================== def _predict_with_uncertainty(self, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: @@ -2858,16 +2858,20 @@ def _acquisition_function(self, x: np.ndarray) -> np.ndarray: # ==================== # TASK_OPTIM: - # evaluate_function() - # _optimize_acquisition_tricands() - # _prepare_de_kwargs() - # _optimize_acquisition_de() - # _optimize_acquisition_scipy() - # _try_optimizer_candidates() - # remove_nan() - # _handle_acquisition_failure() - # _try_fallback_strategy() - # get_shape() + # * evaluate_function() + # * _optimize_acquisition_tricands() + # * _prepare_de_kwargs() + # * _optimize_acquisition_de() + # * _optimize_acquisition_scipy() + # * _try_optimizer_candidates() + # * remove_nan() + # * _handle_acquisition_failure() + # * _try_fallback_strategy() + # * get_shape() + # * optimize_acquisition_func() + # * _optimize_run_task() + # * optimize() + # * execute_optimization_run() # ==================== def evaluate_function(self, X: np.ndarray) -> np.ndarray: @@ -3510,986 +3514,976 @@ def get_shape(self, y: np.ndarray) -> Tuple[int, Optional[int]]: return y.size, None - # ==================== - # TASK_MO: - # store_mo() - # mo2so() - # ==================== - - - def store_mo(self, y_mo: np.ndarray) -> None: - """Store multi-objective values in self.y_mo. - If multi-objective values are present (ndim==2), they are stored in self.y_mo. - New values are appended to existing ones. For single-objective problems, - self.y_mo remains None. + def optimize_acquisition_func(self) -> np.ndarray: + """Optimize the acquisition function to find the next point to evaluate. - Args: - y_mo (ndarray): If multi-objective, shape (n_samples, n_objectives). - If single-objective, shape (n_samples,). + Returns: + ndarray: The optimized point(s). + If acquisition_fun_return_size == 1, returns 1D array of shape (n_features,). + If acquisition_fun_return_size > 1, returns 2D array of shape (N, n_features), + where N is min(acquisition_fun_return_size, population_size). Examples: ```{python} import numpy as np from spotoptim import SpotOptim + def sphere(X): + X = np.atleast_2d(X) + return np.sum(X**2, axis=1) opt = SpotOptim( - fun=lambda X: np.column_stack([ - np.sum(X**2, axis=1), - np.sum((X-1)**2, axis=1) - ]), + fun=sphere, bounds=[(-5, 5), (-5, 5)], + n_initial=5, max_iter=10, - n_initial=5 + seed=0, ) - y_mo_1 = np.array([[1.0, 2.0], [3.0, 4.0]]) - opt.store_mo(y_mo_1) - print(f"y_mo after first call: {opt.y_mo}") - y_mo_2 = np.array([[5.0, 6.0], [7.0, 8.0]]) - opt.store_mo(y_mo_2) - print(f"y_mo after second call: {opt.y_mo}") + opt.optimize() + x_next = opt.suggest_next_infill_point() + print("Next point to evaluate:", x_next) ``` """ - # Store y_mo in self.y_mo (append new values) if multi-objective - if self.y_mo is None and y_mo.ndim == 2: - self.y_mo = y_mo - elif y_mo.ndim == 2: - self.y_mo = np.vstack([self.y_mo, y_mo]) + if self.acquisition_optimizer == "tricands": + return self._optimize_acquisition_tricands() + elif self.acquisition_optimizer == "differential_evolution": + return self._optimize_acquisition_de() + elif self.acquisition_optimizer == "de_tricands": + val = self.rng.rand() + if val < self.prob_de_tricands: + return self._optimize_acquisition_de() + else: + return self._optimize_acquisition_tricands() + else: + return self._optimize_acquisition_scipy() - def mo2so(self, y_mo: np.ndarray) -> np.ndarray: - """Convert multi-objective values to single-objective. - Converts multi-objective values to a single-objective value by applying a user-defined - function from `fun_mo2so`. If no user-defined function is given, the - values in the first objective column are used. - This method is called after the objective function evaluation. It returns a 1D array - with the single-objective values. + def _optimize_run_task( + self, + seed: int, + timeout_start: float, + X0: Optional[np.ndarray], + y0_known_val: Optional[float], + max_iter_override: Optional[int], + shared_best_y=None, # Accept shared value + shared_lock=None, # Accept shared lock + ) -> Tuple[str, OptimizeResult]: + """Helper to run a single optimization task with a specific seed. Calls _optimize_single_run. Args: - y_mo (ndarray): If multi-objective, shape (n_samples, n_objectives). - If single-objective, shape (n_samples,). + seed (int): Seed for this run. + timeout_start (float): Start time for timeout. + X0 (Optional[np.ndarray]): Initial design points in Natural Space, shape (n_initial, n_features). + y0_known_val (Optional[float]): Known best value for initial design. + max_iter_override (Optional[int]): Override for maximum number of iterations. + shared_best_y (Optional[float]): Shared best value for parallel runs. + shared_lock (Optional[Lock]): Shared lock for parallel runs. Returns: - ndarray: Single-objective values, shape (n_samples,). + Tuple[str, OptimizeResult]: Tuple containing status and optimization result. + """ + # Set the seed for this run + self.seed = seed + self.set_seed() - Examples: - ```{python} - import numpy as np - from spotoptim import SpotOptim + # Re-initialize LHS sampler with new seed to ensure diversity in initial design + if hasattr(self, "n_dim"): + self.lhs_sampler = LatinHypercube(d=self.n_dim, rng=self.seed) - # Multi-objective function - def mo_fun(X): - return np.column_stack([ - np.sum(X**2, axis=1), - np.sum((X-1)**2, axis=1) - ]) + return self._optimize_single_run( + timeout_start, + X0, + y0_known=y0_known_val, + max_iter_override=max_iter_override, + shared_best_y=shared_best_y, + shared_lock=shared_lock, + ) - # Example 1: Default behavior (use first objective) - opt1 = SpotOptim( - fun=mo_fun, - bounds=[(-5, 5), (-5, 5)], - max_iter=10, - n_initial=5 - ) - y_mo = np.array([[1.0, 2.0], [3.0, 4.0]]) - y_so = opt1.mo2so(y_mo) - print(f"Single-objective (default): {y_so}") - ``` + def optimize(self, X0: Optional[np.ndarray] = None) -> OptimizeResult: + """Run the optimization process. The optimization terminates when either the total function evaluations reach + `max_iter` (including initial design), or the runtime exceeds max_time minutes. Input/Output spaces are + * Input `X0`: Expected in Natural Space (original scale, physical units). + * Output `result.x`: Returned in Natural Space. + * Output `result.X`: Returned in Natural Space. + * Internal Optimization: Performed in Transformed and Mapped Space. + + Args: + X0 (ndarray, optional): Initial design points in Natural Space, shape (n_initial, n_features). + If None, generates space-filling design. Defaults to None. + + Returns: + OptimizeResult: Optimization result with fields: + * x: best point found in Natural Space + * fun: best function value + * nfev: number of function evaluations (including initial design) + * nit: number of sequential optimization iterations (after initial design) + * success: whether optimization succeeded + * message: termination message indicating reason for stopping, including statistics (function value, iterations, evaluations) + * X: all evaluated points in Natural Space + * y: all function values + Examples: ```{python} import numpy as np from spotoptim import SpotOptim - # Example 2: Custom conversion function (sum of objectives) - def custom_mo2so(y_mo): - return y_mo[:, 0] + y_mo[:, 1] - - opt2 = SpotOptim( - fun=mo_fun, + from spotoptim.function import sphere + opt = SpotOptim( + fun=sphere, bounds=[(-5, 5), (-5, 5)], - max_iter=10, n_initial=5, - fun_mo2so=custom_mo2so + max_iter=10, + seed=0, + x0=np.array([0.1, -0.1]), + verbose=True ) - y_so_custom = opt2.mo2so(y_mo) - print(f"Single-objective (custom): {y_so_custom}") + result = opt.optimize() + print(result.message.splitlines()[0]) + print("Best point:", result.x) + print("Best value:", result.fun) ``` """ - n, m = self.get_shape(y_mo) - self.store_mo(y_mo) + # Track results across restarts for final aggregation. + self.restarts_results_ = [] + # Capture start time for timeout enforcement. + timeout_start = time.time() - # Use ndim to check if multi-objective - if y_mo.ndim == 2: - if self.fun_mo2so is not None: - # Apply user-defined conversion function - y0 = self.fun_mo2so(y_mo) - else: - # Default: use first column - if y_mo.size > 0: - y0 = y_mo[:, 0] - else: - y0 = y_mo - else: - # Single-objective, return as-is - y0 = y_mo + # Initial run state. + current_X0 = X0 + status = "START" - return y0 + while True: + # Get best result so far if we have results + best_res = ( + min(self.restarts_results_, key=lambda x: x.fun) + if self.restarts_results_ + else None + ) - # ==================== - # TASK_OCBA: - # apply_ocba() - # get_ranks() - # get_ocba() - # get_ocba_X() - # ==================== + # Compute injected best value for restarts, then run one optimization cycle. + # y0_known_val carries the current global best objective so the next + # run can skip re-evaluating that known point when restart injection is on. + y0_known_val = ( + best_res.fun + if ( + status == "RESTART" + and self.restart_inject_best + and self.restarts_results_ + ) + else None + ) + # Calculate remaining budget + total_evals_so_far = sum(len(r.y) for r in self.restarts_results_) + remaining_iter = self.max_iter - total_evals_so_far - def apply_ocba(self) -> Optional[np.ndarray]: - """Apply Optimal Computing Budget Allocation for noisy functions. - Determines which existing design points should be re-evaluated based on - OCBA algorithm. This method computes optimal budget allocation to improve - the quality of the estimated best design. + # If we don't have enough budget for at least initial design (or some minimal amount), stop + if remaining_iter < self.n_initial: + if self.verbose: + print("Global budget exhausted. Stopping restarts.") + break - Returns: - Optional[ndarray]: - Array of design points to re-evaluate, shape (n_re_eval, n_features). - Returns None if OCBA conditions are not met or OCBA is disabled. + # Execute one optimization run using the remaining budget; dispatcher + # selects sequential vs parallel based on `n_jobs` and returns status/result. + status, result = self.execute_optimization_run( + timeout_start, + current_X0, + y0_known=y0_known_val, + max_iter_override=remaining_iter, + ) + self.restarts_results_.append(result) - Note: - OCBA is only applied when: + if status == "FINISHED": + break + elif status == "RESTART": + # Prepare for a clean restart: let get_initial_design() regenerate the full design. + current_X0 = None - * (self.repeats_initial > 1) or (self.repeats_surrogate > 1) - * self.ocba_delta > 0 - * All variances are > 0 - * At least 3 design points exist + # Find the global best result across completed restarts. + if self.restarts_results_: + best_res = min(self.restarts_results_, key=lambda r: r.fun) - Examples: - ```{python} - import numpy as np - from spotoptim import SpotOptim - opt = SpotOptim( - fun=lambda X: np.sum(X**2, axis=1) + np.random.normal(0, 0.1, X.shape[0]), - bounds=[(-5, 5), (-5, 5)], - n_initial=5, - repeats_surrogate=2, - ocba_delta=5, - verbose=True - ) - # Simulate optimization state (normally done in optimize()) - opt.mean_X = np.array([[1, 2], [0, 0], [2, 1]]) - opt.mean_y = np.array([5.0, 0.1, 5.0]) - opt.var_y = np.array([0.1, 0.05, 0.15]) - X_ocba = opt.apply_ocba() - # OCBA: Adding 5 re-evaluation(s). - # The following should be true: - print(X_ocba.shape[0] == 5) - ``` + if self.restart_inject_best: + # Inject the current global best into the next run's initial design. + # best_res.x is in natural scale; validate_x0 converts to internal scale + # so the injected point can be mixed with LHS samples. + self.x0 = self.validate_x0(best_res.x) + # Keep current_X0 unset so the initial design is rebuilt around the injected x0. + current_X0 = None - OCBA skipped - insufficient points - ```{python} - import numpy as np - from spotoptim import SpotOptim - opt2 = SpotOptim( - fun=lambda X: np.sum(X**2, axis=1), - bounds=[(-5, 5), (-5, 5)], - repeats_surrogate=2, - ocba_delta=5, - verbose=True - ) - opt2.mean_X = np.array([[1, 2], [0, 0]]) - opt2.mean_y = np.array([5.0, 0.1]) - opt2.var_y = np.array([0.1, 0.05]) - X_ocba = opt2.apply_ocba() - # Warning: OCBA skipped (need >2 points with variance > 0) - print(X_ocba is None) - ``` - """ - # OCBA: Compute optimal budget allocation for noisy functions - # This determines which existing design points should be re-evaluated - X_ocba = None - if ( - self.repeats_initial > 1 or self.repeats_surrogate > 1 - ) and self.ocba_delta > 0: - # Check conditions for OCBA (need variance > 0 and at least 3 points) - if not np.all(self.var_y > 0) and (self.mean_X.shape[0] <= 2): - if self.verbose: - print("Warning: OCBA skipped (need >2 points with variance > 0)") - elif np.all(self.var_y > 0) and (self.mean_X.shape[0] > 2): - # Get OCBA allocation - X_ocba = self.get_ocba_X( - self.mean_X, - self.mean_y, - self.var_y, - self.ocba_delta, - verbose=self.verbose, - ) - if self.verbose and X_ocba is not None: - print(f" OCBA: Adding {X_ocba.shape[0]} re-evaluation(s)") + if self.verbose: + print( + f"Restart injection: Using best found point so far as starting point (f(x)={best_res.fun:.6f})." + ) - return X_ocba + if self.seed is not None and self.n_jobs == 1: + # In sequential mode we advance the seed between restarts to diversify the LHS. + # Parallel mode increments seeds per worker during dispatch. + self.seed += 1 + # Continue loop + else: + # Should not happen + break + # Return best result + if not self.restarts_results_: + return result # Fallback - def get_ranks(self, x: np.ndarray) -> np.ndarray: - """Returns ranks of numbers within input array x. + # Find best result based on 'fun' + best_result = min(self.restarts_results_, key=lambda r: r.fun) - Args: - x (ndarray): Input array. + # Merge results from all parallel runs (and sequential runs if any) + X_all_list = [res.X for res in self.restarts_results_] + y_all_list = [res.y for res in self.restarts_results_] - Returns: - ndarray: Ranks array where ranks[i] is the rank of x[i]. + # Concatenate all evaluations + self.X_ = np.vstack(X_all_list) + self.y_ = np.concatenate(y_all_list) + self.counter = len(self.y_) - Examples: - ```{python} - opt = SpotOptim(fun=lambda X: np.sum(X**2, axis=1), bounds=[(-5, 5)]) - opt.get_ranks(np.array([2, 1])) - opt.get_ranks(np.array([20, 10, 100])) - ``` - """ - ts = x.argsort() - ranks = np.empty_like(ts) - ranks[ts] = np.arange(len(x)) - return ranks + # Aggregated iterations (sum of all runs) + self.n_iter_ = sum(getattr(res, "nit", 0) for res in self.restarts_results_) - def get_ocba( - self, means: np.ndarray, vars: np.ndarray, delta: int, verbose: bool = False - ) -> np.ndarray: - """Optimal Computing Budget Allocation (OCBA). - Calculates budget recommendations for given means, variances, and incremental - budget using the OCBA algorithm. + # Update best solution found + self.best_x_ = best_result.x + self.best_y_ = best_result.fun - References: - [1] Chun-Hung Chen and Loo Hay Lee: Stochastic Simulation Optimization: - An Optimal Computer Budget Allocation, pp. 49 and pp. 215 + return best_result + + def execute_optimization_run( + self, + timeout_start: float, + X0: Optional[np.ndarray] = None, + y0_known: Optional[float] = None, + max_iter_override: Optional[int] = None, + shared_best_y=None, # New arg + shared_lock=None, # New arg + ) -> Tuple[str, OptimizeResult]: + """Dispatcher for optimization run (Sequential vs Steady-State Parallel). + Depending on n_jobs, calls optimize_steady_state (n_jobs > 1) or optimize_sequential_run (n_jobs == 1). Args: - means (ndarray): Array of means. - vars (ndarray): Array of variances. - delta (int): Incremental budget. - verbose (bool): If True, print debug information. Defaults to False. + timeout_start (float): Start time for timeout. + X0 (Optional[np.ndarray]): Initial design points in Natural Space, shape (n_initial, n_features). + y0_known (Optional[float]): Known best value for initial design. + max_iter_override (Optional[int]): Override for maximum number of iterations. + shared_best_y (Optional[float]): Shared best value for parallel runs. + shared_lock (Optional[Lock]): Shared lock for parallel runs. Returns: - ndarray: Array of budget recommendations, or None if conditions not met. + Tuple[str, OptimizeResult]: Tuple containing status and optimization result. Examples: ```{python} + import time import numpy as np from spotoptim import SpotOptim - opt = SpotOptim(fun=lambda X: np.sum(X**2, axis=1), bounds=[(-5, 5)]) - means = np.array([1, 2, 3, 4, 5]) - vars = np.array([1, 1, 9, 9, 4]) - allocations = opt.get_ocba(means, vars, 50) - allocations + from spotoptim.function import sphere + opt = SpotOptim( + fun=sphere, + bounds=[(-5, 5), (-5, 5)], + n_initial=5, + max_iter=10, + seed=0, + n_jobs=1, # Use sequential optimization for deterministic output + verbose=True + ) + status, result = opt.execute_optimization_run(timeout_start=time.time()) + print(status) + print(result.message.splitlines()[0]) ``` """ - if np.all(vars > 0) and (means.shape[0] > 2): - n_designs = means.shape[0] - allocations = np.zeros(n_designs, np.int32) - ratios = np.zeros(n_designs, np.float64) - budget = delta - ranks = self.get_ranks(means) - best, second_best = np.argpartition(ranks, 2)[:2] - ratios[second_best] = 1.0 - select = [i for i in range(n_designs) if i not in [best, second_best]] - temp = (means[best] - means[second_best]) / (means[best] - means[select]) - ratios[select] = np.square(temp) * (vars[select] / vars[second_best]) - select = [i for i in range(n_designs) if i not in [best]] - temp = (np.square(ratios[select]) / vars[select]).sum() - ratios[best] = np.sqrt(vars[best] * temp) - more_runs = np.full(n_designs, True, dtype=bool) - add_budget = np.zeros(n_designs, dtype=float) - more_alloc = True - - if verbose: - print("\nIn get_ocba():") - print(f"means: {means}") - print(f"vars: {vars}") - print(f"delta: {delta}") - print(f"n_designs: {n_designs}") - print(f"Ratios: {ratios}") - print(f"Best: {best}, Second best: {second_best}") - - while more_alloc: - more_alloc = False - ratio_s = (more_runs * ratios).sum() - add_budget[more_runs] = (budget / ratio_s) * ratios[more_runs] - add_budget = np.around(add_budget).astype(int) - mask = add_budget < allocations - add_budget[mask] = allocations[mask] - more_runs[mask] = 0 - - if mask.sum() > 0: - more_alloc = True - if more_alloc: - budget = allocations.sum() + delta - budget -= (add_budget * ~more_runs).sum() - t_budget = add_budget.sum() + # Dispatch to steady-state optimizer if proper parallelization is requested + if self.n_jobs > 1: + return self.optimize_steady_state( + timeout_start, + X0, + y0_known=y0_known, + max_iter_override=max_iter_override, + ) + else: + return self.optimize_sequential_run( + timeout_start, + X0, + y0_known=y0_known, + max_iter_override=max_iter_override, + shared_best_y=shared_best_y, + shared_lock=shared_lock, + ) - # Adjust the best design to match the exact delta - # Ensure we don't go below current allocations - adjustment = allocations.sum() + delta - t_budget - add_budget[best] = max(allocations[best], add_budget[best] + adjustment) + # ==================== + # TASK_MO: + # * store_mo() + # * mo2so() + # ==================== - return add_budget - allocations - else: - return None - def get_ocba_X( - self, - X: np.ndarray, - means: np.ndarray, - vars: np.ndarray, - delta: int, - verbose: bool = False, - ) -> np.ndarray: - """Calculate OCBA allocation (by calling `get_ocba()`) and repeat input array X. - Used in the `optimize()` method to generate new design points based on OCBA. + def store_mo(self, y_mo: np.ndarray) -> None: + """Store multi-objective values in self.y_mo. + If multi-objective values are present (ndim==2), they are stored in self.y_mo. + New values are appended to existing ones. For single-objective problems, + self.y_mo remains None. Args: - X (ndarray): Input array to be repeated, shape (n_designs, n_features). - means (ndarray): Array of means for each design. - vars (ndarray): Array of variances for each design. - delta (int): Incremental budget. - verbose (bool): If True, print debug information. Defaults to False. - - Returns: - ndarray: Repeated array of X based on OCBA allocation, or None if - conditions not met. + y_mo (ndarray): If multi-objective, shape (n_samples, n_objectives). + If single-objective, shape (n_samples,). Examples: ```{python} import numpy as np from spotoptim import SpotOptim - opt = SpotOptim(fun=lambda X: np.sum(X**2, axis=1), bounds=[(-5, 5)]) - X = np.array([[1, 2], [4, 5], [7, 8]]) - means = np.array([1.5, 35, 550]) - vars = np.array([0.5, 50, 5000]) - X_new = opt.get_ocba_X(X, means, vars, delta=5, verbose=False) - print(X_new.shape[0]) # Should have 5 additional evaluations + opt = SpotOptim( + fun=lambda X: np.column_stack([ + np.sum(X**2, axis=1), + np.sum((X-1)**2, axis=1) + ]), + bounds=[(-5, 5), (-5, 5)], + max_iter=10, + n_initial=5 + ) + y_mo_1 = np.array([[1.0, 2.0], [3.0, 4.0]]) + opt.store_mo(y_mo_1) + print(f"y_mo after first call: {opt.y_mo}") + y_mo_2 = np.array([[5.0, 6.0], [7.0, 8.0]]) + opt.store_mo(y_mo_2) + print(f"y_mo after second call: {opt.y_mo}") ``` """ - if np.all(vars > 0) and (means.shape[0] > 2): - o = self.get_ocba(means=means, vars=vars, delta=delta, verbose=verbose) - return np.repeat(X, o, axis=0) - else: - return None + # Store y_mo in self.y_mo (append new values) if multi-objective + if self.y_mo is None and y_mo.ndim == 2: + self.y_mo = y_mo + elif y_mo.ndim == 2: + self.y_mo = np.vstack([self.y_mo, y_mo]) - # ==================== - # TASK_SELECT: - # select_distant_points() - # select_best_cluster() - # _selection_dispatcher() - # select_new() - # suggest_next_infill_point() - # ==================== + def mo2so(self, y_mo: np.ndarray) -> np.ndarray: + """Convert multi-objective values to single-objective. + Converts multi-objective values to a single-objective value by applying a user-defined + function from `fun_mo2so`. If no user-defined function is given, the + values in the first objective column are used. - def select_distant_points( - self, X: np.ndarray, y: np.ndarray, k: int - ) -> Tuple[np.ndarray, np.ndarray]: - """Selects k points that are distant from each other using K-means clustering. - This method performs K-means clustering to find k clusters, then selects - the point closest to each cluster center. This ensures a space-filling - subset of points for surrogate model training. + This method is called after the objective function evaluation. It returns a 1D array + with the single-objective values. Args: - X (ndarray): Design points, shape (n_samples, n_features). - y (ndarray): Function values at X, shape (n_samples,). - k (int): Number of points to select. + y_mo (ndarray): If multi-objective, shape (n_samples, n_objectives). + If single-objective, shape (n_samples,). Returns: - tuple: A tuple containing: - * selected_X (ndarray): Selected design points, shape (k, n_features). - * selected_y (ndarray): Function values at selected points, shape (k,). + ndarray: Single-objective values, shape (n_samples,). Examples: ```{python} import numpy as np from spotoptim import SpotOptim - opt = SpotOptim(fun=lambda X: np.sum(X**2, axis=1), - bounds=[(-5, 5), (-5, 5)], - max_surrogate_points=5) - X = np.random.rand(100, 2) - y = np.random.rand(100) - X_sel, y_sel = opt.select_distant_points(X, y, 5) - print(X_sel.shape) - ``` - """ - # Perform k-means clustering - kmeans = KMeans(n_clusters=k, random_state=0, n_init="auto").fit(X) - - # Find the closest point to each cluster center - selected_indices = [] - for center in kmeans.cluster_centers_: - distances = np.linalg.norm(X - center, axis=1) - closest_idx = np.argmin(distances) - selected_indices.append(closest_idx) - - selected_indices = np.array(selected_indices) - return X[selected_indices], y[selected_indices] - def select_best_cluster( - self, X: np.ndarray, y: np.ndarray, k: int - ) -> Tuple[np.ndarray, np.ndarray]: - """Selects all points from the cluster with the smallest mean y value. - This method performs K-means clustering and selects all points from the - cluster whose center corresponds to the best (smallest) mean objective - function value. - - Args: - X (ndarray): Design points, shape (n_samples, n_features). - y (ndarray): Function values at X, shape (n_samples,). - k (int): Number of clusters. + # Multi-objective function + def mo_fun(X): + return np.column_stack([ + np.sum(X**2, axis=1), + np.sum((X-1)**2, axis=1) + ]) - Returns: - tuple: A tuple containing: - * selected_X (ndarray): Selected design points from best cluster, shape (m, n_features). - * selected_y (ndarray): Function values at selected points, shape (m,). + # Example 1: Default behavior (use first objective) + opt1 = SpotOptim( + fun=mo_fun, + bounds=[(-5, 5), (-5, 5)], + max_iter=10, + n_initial=5 + ) + y_mo = np.array([[1.0, 2.0], [3.0, 4.0]]) + y_so = opt1.mo2so(y_mo) + print(f"Single-objective (default): {y_so}") + ``` - Examples: ```{python} import numpy as np from spotoptim import SpotOptim - opt = SpotOptim(fun=lambda X: np.sum(X**2, axis=1), - bounds=[(-5, 5), (-5, 5)], - max_surrogate_points=5, - selection_method='best') - X = np.random.rand(100, 2) - y = np.random.rand(100) - X_sel, y_sel = opt.select_best_cluster(X, y, 5) - print(f"X_sel.shape: {X_sel.shape}") - print(f"y_sel.shape: {y_sel.shape}") + # Example 2: Custom conversion function (sum of objectives) + def custom_mo2so(y_mo): + return y_mo[:, 0] + y_mo[:, 1] + + opt2 = SpotOptim( + fun=mo_fun, + bounds=[(-5, 5), (-5, 5)], + max_iter=10, + n_initial=5, + fun_mo2so=custom_mo2so + ) + y_so_custom = opt2.mo2so(y_mo) + print(f"Single-objective (custom): {y_so_custom}") ``` """ - # Perform k-means clustering - kmeans = KMeans(n_clusters=k, random_state=0, n_init="auto").fit(X) - labels = kmeans.labels_ + n, m = self.get_shape(y_mo) + self.store_mo(y_mo) - # Compute mean y for each cluster - cluster_means = [] - for cluster_idx in range(k): - cluster_y = y[labels == cluster_idx] - if len(cluster_y) == 0: - cluster_means.append(np.inf) + # Use ndim to check if multi-objective + if y_mo.ndim == 2: + if self.fun_mo2so is not None: + # Apply user-defined conversion function + y0 = self.fun_mo2so(y_mo) else: - cluster_means.append(np.mean(cluster_y)) + # Default: use first column + if y_mo.size > 0: + y0 = y_mo[:, 0] + else: + y0 = y_mo + else: + # Single-objective, return as-is + y0 = y_mo - # Find cluster with smallest mean y - best_cluster = np.argmin(cluster_means) + return y0 - # Select all points from the best cluster - mask = labels == best_cluster - return X[mask], y[mask] + # ==================== + # TASK_OCBA: + # * apply_ocba() + # * get_ranks() + # * get_ocba() + # * get_ocba_X() + # ==================== - def _selection_dispatcher( - self, X: np.ndarray, y: np.ndarray - ) -> Tuple[np.ndarray, np.ndarray]: - """Dispatcher for selection methods. - Depending on the value of `self.selection_method`, this method calls - the appropriate selection function to choose a subset of points for - surrogate model training when the total number of points exceeds - `self.max_surrogate_points`. - Args: - X (ndarray): Design points, shape (n_samples, n_features). - y (ndarray): Function values at X, shape (n_samples,). + def apply_ocba(self) -> Optional[np.ndarray]: + """Apply Optimal Computing Budget Allocation for noisy functions. + Determines which existing design points should be re-evaluated based on + OCBA algorithm. This method computes optimal budget allocation to improve + the quality of the estimated best design. Returns: - tuple: A tuple containing: - * selected_X (ndarray): Selected design points. - * selected_y (ndarray): Function values at selected points. + Optional[ndarray]: + Array of design points to re-evaluate, shape (n_re_eval, n_features). + Returns None if OCBA conditions are not met or OCBA is disabled. + + Note: + OCBA is only applied when: + + * (self.repeats_initial > 1) or (self.repeats_surrogate > 1) + * self.ocba_delta > 0 + * All variances are > 0 + * At least 3 design points exist + + Examples: + ```{python} + import numpy as np + from spotoptim import SpotOptim + opt = SpotOptim( + fun=lambda X: np.sum(X**2, axis=1) + np.random.normal(0, 0.1, X.shape[0]), + bounds=[(-5, 5), (-5, 5)], + n_initial=5, + repeats_surrogate=2, + ocba_delta=5, + verbose=True + ) + # Simulate optimization state (normally done in optimize()) + opt.mean_X = np.array([[1, 2], [0, 0], [2, 1]]) + opt.mean_y = np.array([5.0, 0.1, 5.0]) + opt.var_y = np.array([0.1, 0.05, 0.15]) + X_ocba = opt.apply_ocba() + # OCBA: Adding 5 re-evaluation(s). + # The following should be true: + print(X_ocba.shape[0] == 5) + ``` - Examples: + OCBA skipped - insufficient points ```{python} import numpy as np from spotoptim import SpotOptim - opt = SpotOptim(fun=lambda X: np.sum(X**2, axis=1), - bounds=[(-5, 5), (-5, 5)], - max_surrogate_points=5) - X = np.random.rand(100, 2) - y = np.random.rand(100) - X_sel, y_sel = opt._selection_dispatcher(X, y) - print(X_sel.shape[0] <= 5) + opt2 = SpotOptim( + fun=lambda X: np.sum(X**2, axis=1), + bounds=[(-5, 5), (-5, 5)], + repeats_surrogate=2, + ocba_delta=5, + verbose=True + ) + opt2.mean_X = np.array([[1, 2], [0, 0]]) + opt2.mean_y = np.array([5.0, 0.1]) + opt2.var_y = np.array([0.1, 0.05]) + X_ocba = opt2.apply_ocba() + # Warning: OCBA skipped (need >2 points with variance > 0) + print(X_ocba is None) ``` """ - # Resolve active max points - max_k = getattr(self, "_active_max_surrogate_points", self.max_surrogate_points) + # OCBA: Compute optimal budget allocation for noisy functions + # This determines which existing design points should be re-evaluated + X_ocba = None + if ( + self.repeats_initial > 1 or self.repeats_surrogate > 1 + ) and self.ocba_delta > 0: + # Check conditions for OCBA (need variance > 0 and at least 3 points) + if not np.all(self.var_y > 0) and (self.mean_X.shape[0] <= 2): + if self.verbose: + print("Warning: OCBA skipped (need >2 points with variance > 0)") + elif np.all(self.var_y > 0) and (self.mean_X.shape[0] > 2): + # Get OCBA allocation + X_ocba = self.get_ocba_X( + self.mean_X, + self.mean_y, + self.var_y, + self.ocba_delta, + verbose=self.verbose, + ) + if self.verbose and X_ocba is not None: + print(f" OCBA: Adding {X_ocba.shape[0]} re-evaluation(s)") - if max_k is None: - return X, y + return X_ocba - if self.selection_method == "distant": - return self.select_distant_points(X=X, y=y, k=max_k) - elif self.selection_method == "best": - return self.select_best_cluster(X=X, y=y, k=max_k) - else: - # If no valid selection method, return all points - return X, y - def select_new( - self, A: np.ndarray, X: np.ndarray, tolerance: float = 0 - ) -> Tuple[np.ndarray, np.ndarray]: - """Select rows from A that are not in X. - Used in suggest_next_infill_point() to avoid duplicate evaluations. + def get_ranks(self, x: np.ndarray) -> np.ndarray: + """Returns ranks of numbers within input array x. Args: - A (ndarray): Array with new values. - X (ndarray): Array with known values. - tolerance (float, optional): Tolerance value for comparison. Defaults to 0. + x (ndarray): Input array. Returns: - tuple: A tuple containing: - * ndarray: Array with unknown (new) values. - * ndarray: Array with True if value is new, otherwise False. + ndarray: Ranks array where ranks[i] is the rank of x[i]. Examples: ```{python} - import numpy as np - from spotoptim import SpotOptim - def sphere(X): - X = np.atleast_2d(X) - return np.sum(X**2, axis=1) - opt = SpotOptim(fun=sphere, bounds=[(-5, 5)]) - A = np.array([[1, 2], [3, 4], [5, 6]]) - X = np.array([[3, 4], [7, 8]]) - new_A, is_new = opt.select_new(A, X) - print("New A:", new_A) - print("Is new:", is_new) + opt = SpotOptim(fun=lambda X: np.sum(X**2, axis=1), bounds=[(-5, 5)]) + opt.get_ranks(np.array([2, 1])) + opt.get_ranks(np.array([20, 10, 100])) ``` """ - if len(X) == 0: - return A, np.ones(len(A), dtype=bool) - - # Calculate distances using the configured metric - # cdist supports 'euclidean', 'minkowski', 'chebyshev', etc. - # Note: 'chebyshev' is closest to the previous logic but checks absolute difference on all coords - # Previous logic: np.all(np.abs(diff) <= tolerance) -> Chebyshev <= tolerance - dists = cdist(A, X, metric=self.min_tol_metric) - - # Check if min distance to any existing point is <= tolerance (duplicate) - # Duplicate if ANY existing point is within tolerance - # is_duplicate[i] is True if A[i] is close to at least one point in X - is_duplicate = np.any(dists <= tolerance, axis=1) - - ind = is_duplicate - return A[~ind], ~ind - + ts = x.argsort() + ranks = np.empty_like(ts) + ranks[ts] = np.arange(len(x)) + return ranks + def get_ocba( + self, means: np.ndarray, vars: np.ndarray, delta: int, verbose: bool = False + ) -> np.ndarray: + """Optimal Computing Budget Allocation (OCBA). + Calculates budget recommendations for given means, variances, and incremental + budget using the OCBA algorithm. - def suggest_next_infill_point(self) -> np.ndarray: - """Suggest next point to evaluate (dispatcher). - Used in both sequential and parallel optimization loops. This method orchestrates the process of generating candidate points from the acquisition function optimizer, handling any failures in the acquisition process with a fallback strategy, and ensuring that the returned point(s) are valid and ready for evaluation. - The returned point is in the Transformed and Mapped Space (Internal Optimization Space). - This means: - 1. Transformations (e.g., log, sqrt) have been applied. - 2. Dimension reduction has been applied (fixed variables removed). - Process: - 1. Try candidates from acquisition function optimizer. - 2. Handle acquisition failure (fallback). - 3. Return last attempt if all fails. + References: + [1] Chun-Hung Chen and Loo Hay Lee: Stochastic Simulation Optimization: + An Optimal Computer Budget Allocation, pp. 49 and pp. 215 + Args: + means (ndarray): Array of means. + vars (ndarray): Array of variances. + delta (int): Incremental budget. + verbose (bool): If True, print debug information. Defaults to False. Returns: - ndarray: Next point(s) to evaluate in Transformed and Mapped Space. - Shape is (n_infill_points, n_features). + ndarray: Array of budget recommendations, or None if conditions not met. Examples: ```{python} import numpy as np from spotoptim import SpotOptim - def sphere(X): - X = np.atleast_2d(X) - return np.sum(X**2, axis=1) - opt = SpotOptim( - fun=sphere, - bounds=[(-5, 5), (-5, 5)], - n_initial=5, - n_infill_points=2 - ) - # Need to initialize optimization state (X_, y_, surrogate) - # Normally done inside optimize() - np.random.seed(0) - opt.X_ = np.random.rand(10, 2) - opt.y_ = np.random.rand(10) - opt._fit_surrogate(opt.X_, opt.y_) - x_next = opt.suggest_next_infill_point() - x_next.shape + opt = SpotOptim(fun=lambda X: np.sum(X**2, axis=1), bounds=[(-5, 5)]) + means = np.array([1, 2, 3, 4, 5]) + vars = np.array([1, 1, 9, 9, 4]) + allocations = opt.get_ocba(means, vars, 50) + allocations ``` """ - # 1. Optimizer candidates - candidates = [] - opt_candidates = self._try_optimizer_candidates( - n_needed=self.n_infill_points, current_batch=candidates - ) - candidates.extend(opt_candidates) - - if len(candidates) >= self.n_infill_points: - return np.vstack(candidates) - - # 2. Try fallback strategy to fill remaining slots - while len(candidates) < self.n_infill_points: - # Just try one attempt at a time but loop - # We pass current batch to avoid dups - cand, x_last = self._try_fallback_strategy( - max_attempts=10, current_batch=candidates - ) - if cand is not None: - candidates.append(cand) - else: - # Fallback failed to find unique point even after retries - # Break and fill with last attempts or just return what we have? - # If we return partial batch, we might fail downstream if code expects n points? - # Actually code should handle any number of points returned by this method? - # Or duplicate valid points? - # Warn and use duplicate if absolutely necessary? - if self.verbose: - print( - "Warning: Could not fill all infill points with unique candidates." - ) - break - - if len(candidates) > 0: - return np.vstack(candidates) - - # 3. Return last attempt (duplicate) if absolutely nothing found - # This returns a single point (1, d). - # Should we return n copies? - # If n_infill_points > 1, we should probably output (n, d) + if np.all(vars > 0) and (means.shape[0] > 2): + n_designs = means.shape[0] + allocations = np.zeros(n_designs, np.int32) + ratios = np.zeros(n_designs, np.float64) + budget = delta + ranks = self.get_ranks(means) + best, second_best = np.argpartition(ranks, 2)[:2] + ratios[second_best] = 1.0 + select = [i for i in range(n_designs) if i not in [best, second_best]] + temp = (means[best] - means[second_best]) / (means[best] - means[select]) + ratios[select] = np.square(temp) * (vars[select] / vars[second_best]) + select = [i for i in range(n_designs) if i not in [best]] + temp = (np.square(ratios[select]) / vars[select]).sum() + ratios[best] = np.sqrt(vars[best] * temp) + more_runs = np.full(n_designs, True, dtype=bool) + add_budget = np.zeros(n_designs, dtype=float) + more_alloc = True - if self.verbose: - print( - "Warning: Could not find unique point after optimization candidates and fallback attempts. " - "Returning last candidate (duplicate)." - ) + if verbose: + print("\nIn get_ocba():") + print(f"means: {means}") + print(f"vars: {vars}") + print(f"delta: {delta}") + print(f"n_designs: {n_designs}") + print(f"Ratios: {ratios}") + print(f"Best: {best}, Second best: {second_best}") - # Verify x_last is not None - if x_last is None: - # Should practically not happen - x_next = self._handle_acquisition_failure() - return x_next.reshape(1, -1) + while more_alloc: + more_alloc = False + ratio_s = (more_runs * ratios).sum() + add_budget[more_runs] = (budget / ratio_s) * ratios[more_runs] + add_budget = np.around(add_budget).astype(int) + mask = add_budget < allocations + add_budget[mask] = allocations[mask] + more_runs[mask] = 0 - # Return duplicated x_last to fill n_infill_points? OR just 1? - # Let's return 1 and let loop repeat it? - # But loop repeats based on x_next logic. - # If we return 1 point, it is treated as 1 point. - # If user asked for n_infill_points, maybe we should just return what we have (1 duplicated). + if mask.sum() > 0: + more_alloc = True + if more_alloc: + budget = allocations.sum() + delta + budget -= (add_budget * ~more_runs).sum() - return x_last.reshape(1, -1) + t_budget = add_budget.sum() + # Adjust the best design to match the exact delta + # Ensure we don't go below current allocations + adjustment = allocations.sum() + delta - t_budget + add_budget[best] = max(allocations[best], add_budget[best] + adjustment) - # ==================== - # TASK_OPTIM: - # optimize_acquisition_func() - # _optimize_run_task() - # optimize() - # execute_optimization_run() - # ==================== + return add_budget - allocations + else: + return None - def optimize_acquisition_func(self) -> np.ndarray: - """Optimize the acquisition function to find the next point to evaluate. + def get_ocba_X( + self, + X: np.ndarray, + means: np.ndarray, + vars: np.ndarray, + delta: int, + verbose: bool = False, + ) -> np.ndarray: + """Calculate OCBA allocation (by calling `get_ocba()`) and repeat input array X. + Used in the `optimize()` method to generate new design points based on OCBA. + + Args: + X (ndarray): Input array to be repeated, shape (n_designs, n_features). + means (ndarray): Array of means for each design. + vars (ndarray): Array of variances for each design. + delta (int): Incremental budget. + verbose (bool): If True, print debug information. Defaults to False. Returns: - ndarray: The optimized point(s). - If acquisition_fun_return_size == 1, returns 1D array of shape (n_features,). - If acquisition_fun_return_size > 1, returns 2D array of shape (N, n_features), - where N is min(acquisition_fun_return_size, population_size). + ndarray: Repeated array of X based on OCBA allocation, or None if + conditions not met. Examples: ```{python} import numpy as np from spotoptim import SpotOptim - def sphere(X): - X = np.atleast_2d(X) - return np.sum(X**2, axis=1) - opt = SpotOptim( - fun=sphere, - bounds=[(-5, 5), (-5, 5)], - n_initial=5, - max_iter=10, - seed=0, - ) - opt.optimize() - x_next = opt.suggest_next_infill_point() - print("Next point to evaluate:", x_next) + opt = SpotOptim(fun=lambda X: np.sum(X**2, axis=1), bounds=[(-5, 5)]) + X = np.array([[1, 2], [4, 5], [7, 8]]) + means = np.array([1.5, 35, 550]) + vars = np.array([0.5, 50, 5000]) + X_new = opt.get_ocba_X(X, means, vars, delta=5, verbose=False) + print(X_new.shape[0]) # Should have 5 additional evaluations ``` """ - if self.acquisition_optimizer == "tricands": - return self._optimize_acquisition_tricands() - elif self.acquisition_optimizer == "differential_evolution": - return self._optimize_acquisition_de() - elif self.acquisition_optimizer == "de_tricands": - val = self.rng.rand() - if val < self.prob_de_tricands: - return self._optimize_acquisition_de() - else: - return self._optimize_acquisition_tricands() + if np.all(vars > 0) and (means.shape[0] > 2): + o = self.get_ocba(means=means, vars=vars, delta=delta, verbose=verbose) + return np.repeat(X, o, axis=0) else: - return self._optimize_acquisition_scipy() + return None + # ==================== + # TASK_SELECT: + # * select_distant_points() + # * select_best_cluster() + # * _selection_dispatcher() + # * select_new() + # * suggest_next_infill_point() + # ==================== - def _optimize_run_task( - self, - seed: int, - timeout_start: float, - X0: Optional[np.ndarray], - y0_known_val: Optional[float], - max_iter_override: Optional[int], - shared_best_y=None, # Accept shared value - shared_lock=None, # Accept shared lock - ) -> Tuple[str, OptimizeResult]: - """Helper to run a single optimization task with a specific seed. Calls _optimize_single_run. + def select_distant_points( + self, X: np.ndarray, y: np.ndarray, k: int + ) -> Tuple[np.ndarray, np.ndarray]: + """Selects k points that are distant from each other using K-means clustering. + This method performs K-means clustering to find k clusters, then selects + the point closest to each cluster center. This ensures a space-filling + subset of points for surrogate model training. Args: - seed (int): Seed for this run. - timeout_start (float): Start time for timeout. - X0 (Optional[np.ndarray]): Initial design points in Natural Space, shape (n_initial, n_features). - y0_known_val (Optional[float]): Known best value for initial design. - max_iter_override (Optional[int]): Override for maximum number of iterations. - shared_best_y (Optional[float]): Shared best value for parallel runs. - shared_lock (Optional[Lock]): Shared lock for parallel runs. + X (ndarray): Design points, shape (n_samples, n_features). + y (ndarray): Function values at X, shape (n_samples,). + k (int): Number of points to select. Returns: - Tuple[str, OptimizeResult]: Tuple containing status and optimization result. + tuple: A tuple containing: + * selected_X (ndarray): Selected design points, shape (k, n_features). + * selected_y (ndarray): Function values at selected points, shape (k,). + + Examples: + ```{python} + import numpy as np + from spotoptim import SpotOptim + opt = SpotOptim(fun=lambda X: np.sum(X**2, axis=1), + bounds=[(-5, 5), (-5, 5)], + max_surrogate_points=5) + X = np.random.rand(100, 2) + y = np.random.rand(100) + X_sel, y_sel = opt.select_distant_points(X, y, 5) + print(X_sel.shape) + ``` """ - # Set the seed for this run - self.seed = seed - self.set_seed() + # Perform k-means clustering + kmeans = KMeans(n_clusters=k, random_state=0, n_init="auto").fit(X) - # Re-initialize LHS sampler with new seed to ensure diversity in initial design - if hasattr(self, "n_dim"): - self.lhs_sampler = LatinHypercube(d=self.n_dim, rng=self.seed) + # Find the closest point to each cluster center + selected_indices = [] + for center in kmeans.cluster_centers_: + distances = np.linalg.norm(X - center, axis=1) + closest_idx = np.argmin(distances) + selected_indices.append(closest_idx) - return self._optimize_single_run( - timeout_start, - X0, - y0_known=y0_known_val, - max_iter_override=max_iter_override, - shared_best_y=shared_best_y, - shared_lock=shared_lock, - ) + selected_indices = np.array(selected_indices) + return X[selected_indices], y[selected_indices] - def optimize(self, X0: Optional[np.ndarray] = None) -> OptimizeResult: - """Run the optimization process. The optimization terminates when either the total function evaluations reach - `max_iter` (including initial design), or the runtime exceeds max_time minutes. Input/Output spaces are - * Input `X0`: Expected in Natural Space (original scale, physical units). - * Output `result.x`: Returned in Natural Space. - * Output `result.X`: Returned in Natural Space. - * Internal Optimization: Performed in Transformed and Mapped Space. + def select_best_cluster( + self, X: np.ndarray, y: np.ndarray, k: int + ) -> Tuple[np.ndarray, np.ndarray]: + """Selects all points from the cluster with the smallest mean y value. + This method performs K-means clustering and selects all points from the + cluster whose center corresponds to the best (smallest) mean objective + function value. Args: - X0 (ndarray, optional): Initial design points in Natural Space, shape (n_initial, n_features). - If None, generates space-filling design. Defaults to None. + X (ndarray): Design points, shape (n_samples, n_features). + y (ndarray): Function values at X, shape (n_samples,). + k (int): Number of clusters. Returns: - OptimizeResult: Optimization result with fields: - * x: best point found in Natural Space - * fun: best function value - * nfev: number of function evaluations (including initial design) - * nit: number of sequential optimization iterations (after initial design) - * success: whether optimization succeeded - * message: termination message indicating reason for stopping, including statistics (function value, iterations, evaluations) - * X: all evaluated points in Natural Space - * y: all function values + tuple: A tuple containing: + * selected_X (ndarray): Selected design points from best cluster, shape (m, n_features). + * selected_y (ndarray): Function values at selected points, shape (m,). Examples: ```{python} import numpy as np from spotoptim import SpotOptim - from spotoptim.function import sphere - opt = SpotOptim( - fun=sphere, - bounds=[(-5, 5), (-5, 5)], - n_initial=5, - max_iter=10, - seed=0, - x0=np.array([0.1, -0.1]), - verbose=True - ) - result = opt.optimize() - print(result.message.splitlines()[0]) - print("Best point:", result.x) - print("Best value:", result.fun) + opt = SpotOptim(fun=lambda X: np.sum(X**2, axis=1), + bounds=[(-5, 5), (-5, 5)], + max_surrogate_points=5, + selection_method='best') + X = np.random.rand(100, 2) + y = np.random.rand(100) + X_sel, y_sel = opt.select_best_cluster(X, y, 5) + print(f"X_sel.shape: {X_sel.shape}") + print(f"y_sel.shape: {y_sel.shape}") ``` """ - # Track results across restarts for final aggregation. - self.restarts_results_ = [] - # Capture start time for timeout enforcement. - timeout_start = time.time() - - # Initial run state. - current_X0 = X0 - status = "START" + # Perform k-means clustering + kmeans = KMeans(n_clusters=k, random_state=0, n_init="auto").fit(X) + labels = kmeans.labels_ - while True: - # Get best result so far if we have results - best_res = ( - min(self.restarts_results_, key=lambda x: x.fun) - if self.restarts_results_ - else None - ) + # Compute mean y for each cluster + cluster_means = [] + for cluster_idx in range(k): + cluster_y = y[labels == cluster_idx] + if len(cluster_y) == 0: + cluster_means.append(np.inf) + else: + cluster_means.append(np.mean(cluster_y)) - # Compute injected best value for restarts, then run one optimization cycle. - # y0_known_val carries the current global best objective so the next - # run can skip re-evaluating that known point when restart injection is on. - y0_known_val = ( - best_res.fun - if ( - status == "RESTART" - and self.restart_inject_best - and self.restarts_results_ - ) - else None - ) + # Find cluster with smallest mean y + best_cluster = np.argmin(cluster_means) - # Calculate remaining budget - total_evals_so_far = sum(len(r.y) for r in self.restarts_results_) - remaining_iter = self.max_iter - total_evals_so_far + # Select all points from the best cluster + mask = labels == best_cluster + return X[mask], y[mask] - # If we don't have enough budget for at least initial design (or some minimal amount), stop - if remaining_iter < self.n_initial: - if self.verbose: - print("Global budget exhausted. Stopping restarts.") - break + def _selection_dispatcher( + self, X: np.ndarray, y: np.ndarray + ) -> Tuple[np.ndarray, np.ndarray]: + """Dispatcher for selection methods. + Depending on the value of `self.selection_method`, this method calls + the appropriate selection function to choose a subset of points for + surrogate model training when the total number of points exceeds + `self.max_surrogate_points`. - # Execute one optimization run using the remaining budget; dispatcher - # selects sequential vs parallel based on `n_jobs` and returns status/result. - status, result = self.execute_optimization_run( - timeout_start, - current_X0, - y0_known=y0_known_val, - max_iter_override=remaining_iter, - ) - self.restarts_results_.append(result) + Args: + X (ndarray): Design points, shape (n_samples, n_features). + y (ndarray): Function values at X, shape (n_samples,). - if status == "FINISHED": - break - elif status == "RESTART": - # Prepare for a clean restart: let get_initial_design() regenerate the full design. - current_X0 = None + Returns: + tuple: A tuple containing: + * selected_X (ndarray): Selected design points. + * selected_y (ndarray): Function values at selected points. - # Find the global best result across completed restarts. - if self.restarts_results_: - best_res = min(self.restarts_results_, key=lambda r: r.fun) + Examples: + ```{python} + import numpy as np + from spotoptim import SpotOptim + opt = SpotOptim(fun=lambda X: np.sum(X**2, axis=1), + bounds=[(-5, 5), (-5, 5)], + max_surrogate_points=5) + X = np.random.rand(100, 2) + y = np.random.rand(100) + X_sel, y_sel = opt._selection_dispatcher(X, y) + print(X_sel.shape[0] <= 5) + ``` + """ + # Resolve active max points + max_k = getattr(self, "_active_max_surrogate_points", self.max_surrogate_points) - if self.restart_inject_best: - # Inject the current global best into the next run's initial design. - # best_res.x is in natural scale; validate_x0 converts to internal scale - # so the injected point can be mixed with LHS samples. - self.x0 = self.validate_x0(best_res.x) - # Keep current_X0 unset so the initial design is rebuilt around the injected x0. - current_X0 = None + if max_k is None: + return X, y - if self.verbose: - print( - f"Restart injection: Using best found point so far as starting point (f(x)={best_res.fun:.6f})." - ) + if self.selection_method == "distant": + return self.select_distant_points(X=X, y=y, k=max_k) + elif self.selection_method == "best": + return self.select_best_cluster(X=X, y=y, k=max_k) + else: + # If no valid selection method, return all points + return X, y - if self.seed is not None and self.n_jobs == 1: - # In sequential mode we advance the seed between restarts to diversify the LHS. - # Parallel mode increments seeds per worker during dispatch. - self.seed += 1 - # Continue loop - else: - # Should not happen - break + def select_new( + self, A: np.ndarray, X: np.ndarray, tolerance: float = 0 + ) -> Tuple[np.ndarray, np.ndarray]: + """Select rows from A that are not in X. + Used in suggest_next_infill_point() to avoid duplicate evaluations. - # Return best result - if not self.restarts_results_: - return result # Fallback + Args: + A (ndarray): Array with new values. + X (ndarray): Array with known values. + tolerance (float, optional): Tolerance value for comparison. Defaults to 0. - # Find best result based on 'fun' - best_result = min(self.restarts_results_, key=lambda r: r.fun) + Returns: + tuple: A tuple containing: + * ndarray: Array with unknown (new) values. + * ndarray: Array with True if value is new, otherwise False. - # Merge results from all parallel runs (and sequential runs if any) - X_all_list = [res.X for res in self.restarts_results_] - y_all_list = [res.y for res in self.restarts_results_] + Examples: + ```{python} + import numpy as np + from spotoptim import SpotOptim + def sphere(X): + X = np.atleast_2d(X) + return np.sum(X**2, axis=1) + opt = SpotOptim(fun=sphere, bounds=[(-5, 5)]) + A = np.array([[1, 2], [3, 4], [5, 6]]) + X = np.array([[3, 4], [7, 8]]) + new_A, is_new = opt.select_new(A, X) + print("New A:", new_A) + print("Is new:", is_new) + ``` + """ + if len(X) == 0: + return A, np.ones(len(A), dtype=bool) - # Concatenate all evaluations - self.X_ = np.vstack(X_all_list) - self.y_ = np.concatenate(y_all_list) - self.counter = len(self.y_) + # Calculate distances using the configured metric + # cdist supports 'euclidean', 'minkowski', 'chebyshev', etc. + # Note: 'chebyshev' is closest to the previous logic but checks absolute difference on all coords + # Previous logic: np.all(np.abs(diff) <= tolerance) -> Chebyshev <= tolerance + dists = cdist(A, X, metric=self.min_tol_metric) - # Aggregated iterations (sum of all runs) - self.n_iter_ = sum(getattr(res, "nit", 0) for res in self.restarts_results_) + # Check if min distance to any existing point is <= tolerance (duplicate) + # Duplicate if ANY existing point is within tolerance + # is_duplicate[i] is True if A[i] is close to at least one point in X + is_duplicate = np.any(dists <= tolerance, axis=1) - # Update best solution found - self.best_x_ = best_result.x - self.best_y_ = best_result.fun + ind = is_duplicate + return A[~ind], ~ind - return best_result - def execute_optimization_run( - self, - timeout_start: float, - X0: Optional[np.ndarray] = None, - y0_known: Optional[float] = None, - max_iter_override: Optional[int] = None, - shared_best_y=None, # New arg - shared_lock=None, # New arg - ) -> Tuple[str, OptimizeResult]: - """Dispatcher for optimization run (Sequential vs Steady-State Parallel). - Depending on n_jobs, calls optimize_steady_state (n_jobs > 1) or optimize_sequential_run (n_jobs == 1). - Args: - timeout_start (float): Start time for timeout. - X0 (Optional[np.ndarray]): Initial design points in Natural Space, shape (n_initial, n_features). - y0_known (Optional[float]): Known best value for initial design. - max_iter_override (Optional[int]): Override for maximum number of iterations. - shared_best_y (Optional[float]): Shared best value for parallel runs. - shared_lock (Optional[Lock]): Shared lock for parallel runs. + def suggest_next_infill_point(self) -> np.ndarray: + """Suggest next point to evaluate (dispatcher). + Used in both sequential and parallel optimization loops. This method orchestrates the process of generating candidate points from the acquisition function optimizer, handling any failures in the acquisition process with a fallback strategy, and ensuring that the returned point(s) are valid and ready for evaluation. + The returned point is in the Transformed and Mapped Space (Internal Optimization Space). + This means: + 1. Transformations (e.g., log, sqrt) have been applied. + 2. Dimension reduction has been applied (fixed variables removed). + Process: + 1. Try candidates from acquisition function optimizer. + 2. Handle acquisition failure (fallback). + 3. Return last attempt if all fails. + Returns: - Tuple[str, OptimizeResult]: Tuple containing status and optimization result. + ndarray: Next point(s) to evaluate in Transformed and Mapped Space. + Shape is (n_infill_points, n_features). Examples: ```{python} - import time import numpy as np from spotoptim import SpotOptim - from spotoptim.function import sphere + def sphere(X): + X = np.atleast_2d(X) + return np.sum(X**2, axis=1) opt = SpotOptim( fun=sphere, bounds=[(-5, 5), (-5, 5)], n_initial=5, - max_iter=10, - seed=0, - n_jobs=1, # Use sequential optimization for deterministic output - verbose=True + n_infill_points=2 ) - status, result = opt.execute_optimization_run(timeout_start=time.time()) - print(status) - print(result.message.splitlines()[0]) + # Need to initialize optimization state (X_, y_, surrogate) + # Normally done inside optimize() + np.random.seed(0) + opt.X_ = np.random.rand(10, 2) + opt.y_ = np.random.rand(10) + opt._fit_surrogate(opt.X_, opt.y_) + x_next = opt.suggest_next_infill_point() + x_next.shape ``` """ + # 1. Optimizer candidates + candidates = [] + opt_candidates = self._try_optimizer_candidates( + n_needed=self.n_infill_points, current_batch=candidates + ) + candidates.extend(opt_candidates) - # Dispatch to steady-state optimizer if proper parallelization is requested - if self.n_jobs > 1: - return self.optimize_steady_state( - timeout_start, - X0, - y0_known=y0_known, - max_iter_override=max_iter_override, + if len(candidates) >= self.n_infill_points: + return np.vstack(candidates) + + # 2. Try fallback strategy to fill remaining slots + while len(candidates) < self.n_infill_points: + # Just try one attempt at a time but loop + # We pass current batch to avoid dups + cand, x_last = self._try_fallback_strategy( + max_attempts=10, current_batch=candidates ) - else: - return self.optimize_sequential_run( - timeout_start, - X0, - y0_known=y0_known, - max_iter_override=max_iter_override, - shared_best_y=shared_best_y, - shared_lock=shared_lock, + if cand is not None: + candidates.append(cand) + else: + # Fallback failed to find unique point even after retries + # Break and fill with last attempts or just return what we have? + # If we return partial batch, we might fail downstream if code expects n points? + # Actually code should handle any number of points returned by this method? + # Or duplicate valid points? + # Warn and use duplicate if absolutely necessary? + if self.verbose: + print( + "Warning: Could not fill all infill points with unique candidates." + ) + break + + if len(candidates) > 0: + return np.vstack(candidates) + + # 3. Return last attempt (duplicate) if absolutely nothing found + # This returns a single point (1, d). + # Should we return n copies? + # If n_infill_points > 1, we should probably output (n, d) + + if self.verbose: + print( + "Warning: Could not find unique point after optimization candidates and fallback attempts. " + "Returning last candidate (duplicate)." ) + # Verify x_last is not None + if x_last is None: + # Should practically not happen + x_next = self._handle_acquisition_failure() + return x_next.reshape(1, -1) + + # Return duplicated x_last to fill n_infill_points? OR just 1? + # Let's return 1 and let loop repeat it? + # But loop repeats based on x_next logic. + # If we return 1 point, it is treated as 1 point. + # If user asked for n_infill_points, maybe we should just return what we have (1 duplicated). + + return x_last.reshape(1, -1) + + # ==================== # TASK_OPTIM_SEQ: - # determine_termination() - # apply_penalty_NA() - # update_best_main_loop() - # handle_NA_new_points() - # optimize_sequential_run() - # _initialize_run() - # update_repeats_infill_points() - # _run_sequential_loop() + # * determine_termination() + # * apply_penalty_NA() + # * update_best_main_loop() + # * handle_NA_new_points() + # * optimize_sequential_run() + # * _initialize_run() + # * update_repeats_infill_points() + # * _run_sequential_loop() # ==================== - - def determine_termination(self, timeout_start: float) -> str: """Determine termination reason for optimization. Checks the termination conditions and returns an appropriate message @@ -5324,8 +5318,8 @@ def _run_sequential_loop( # ==================== # TASK_OPTIM_PARALLEL: - # _update_storage_steady() - # optimize_steady_state() + # * _update_storage_steady() + # * optimize_steady_state() # ==================== def _update_storage_steady(self, x, y): @@ -5797,13 +5791,13 @@ def _batch_ready() -> bool: # ==================== # TASK_STATS: - # init_storage() - # update_storage() - # update_stats() - # update_success_rate() - # get_success_rate() - # aggregate_mean_var() - # get_best_hyperparameters + # * init_storage() + # * update_storage() + # * update_stats() + # * update_success_rate() + # * get_success_rate() + # * aggregate_mean_var() + # * get_best_hyperparameters # ==================== def get_best_hyperparameters( @@ -6174,20 +6168,20 @@ def aggregate_mean_var( # ==================== # TASK_RESULTS: - # save_result() - # load_result() - # save_experiment() - # load_experiment() - # get_result_filename() - # get_experiment_filename() - # print_results() - # print_best() - # get_results_table() - # get_design_table() - # gen_design_table() - # get_importance() - # sensitivity_spearman() - # get_stars() + # * save_result() + # * load_result() + # * save_experiment() + # * load_experiment() + # * get_result_filename() + # * get_experiment_filename() + # * print_results() + # * print_best() + # * get_results_table() + # * get_design_table() + # * gen_design_table() + # * get_importance() + # * sensitivity_spearman() + # * get_stars() # ==================== def save_result( @@ -7223,13 +7217,13 @@ def test_func(X): # ==================== # TASK_TENSORBOARD: - # _clen_tensorboard_logs() - # _init_tensorboard_writer() - # _write_tensorboard_scalars() - # _write_tensorboard_hparams() - # _close_tensorboard_writer() - # init_tensorboard() - # _close_and_del_tensorboard_writer() + # * _clen_tensorboard_logs() + # * _init_tensorboard_writer() + # * _write_tensorboard_scalars() + # * _write_tensorboard_hparams() + # * _close_tensorboard_writer() + # * init_tensorboard() + # * _close_and_del_tensorboard_writer() # ==================== def _clean_tensorboard_logs(self) -> None: @@ -7465,12 +7459,12 @@ def _close_and_del_tensorboard_writer(self) -> None: # ==================== # TASK_PLOT: - # plot_progress() - # plot_surrogate() - # plot_important_hyperparameter_contour() - # _plot_surrogate_with_factors() - # plot_importance() - # plot_parameter_scatter() + # * plot_progress() + # * plot_surrogate() + # * plot_important_hyperparameter_contour() + # * _plot_surrogate_with_factors() + # * plot_importance() + # * plot_parameter_scatter() # ==================== def plot_progress(