From 4b08b6bf0931d24088ae9befd45197d36b7f22bd Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Wed, 1 Apr 2026 12:26:31 -0400 Subject: [PATCH 1/7] Initial framework for state save checkpointing --- include/core/Layer.hpp | 28 ++++-- include/core/NgenSimulation.hpp | 16 +++- .../catchment/Bmi_Formulation.hpp | 5 ++ include/simulation_time/Simulation_Time.hpp | 8 ++ include/state_save_restore/File_Per_Unit.hpp | 4 +- .../state_save_restore/State_Save_Restore.hpp | 51 +++++++++-- .../state_save_restore/State_Save_Utils.hpp | 4 +- src/NGen.cpp | 20 ++++- src/core/CMakeLists.txt | 1 + src/core/Layer.cpp | 32 +++++-- src/core/NgenSimulation.cpp | 85 +++++++++++++----- .../catchment/Bmi_Module_Formulation.cpp | 4 +- .../catchment/Bmi_Multi_Formulation.cpp | 8 +- src/state_save_restore/File_Per_Unit.cpp | 48 ++++++---- src/state_save_restore/State_Save_Restore.cpp | 87 ++++++++++++++++--- 15 files changed, 317 insertions(+), 84 deletions(-) diff --git a/include/core/Layer.hpp b/include/core/Layer.hpp index 643f1b6970..33c44fb473 100644 --- a/include/core/Layer.hpp +++ b/include/core/Layer.hpp @@ -8,7 +8,9 @@ #include "Simulation_Time.hpp" #include "State_Exception.hpp" #include "geojson/FeatureBuilder.hpp" +#include "state_save_restore/State_Save_Restore.hpp" #include +#include namespace hy_features { @@ -16,9 +18,6 @@ namespace hy_features class HY_Features_MPI; } -class State_Snapshot_Saver; -class State_Snapshot_Loader; - namespace ngen { @@ -113,10 +112,20 @@ namespace ngen std::unordered_map &nexus_indexes, int current_step); - virtual void save_state_snapshot(std::shared_ptr snapshot_saver); - virtual void load_state_snapshot(std::shared_ptr snapshot_loader); + /** + * Save the current state including metatdata related to current layer times + */ + virtual void save_checkpoint(std::shared_ptr snapshot_saver); + /** + * Save the current state excluding metatdata related to current layer times + */ + virtual void save_end_of_run(std::shared_ptr snapshot_saver); + virtual void load_checkpoint(std::shared_ptr snapshot_loader); virtual void load_hot_start(std::shared_ptr snapshot_loader); + std::string unit_name() const; + virtual std::vector required_checkpoint_units() const; + protected: const LayerDescription description; @@ -127,7 +136,14 @@ namespace ngen feature_type& features; //TODO is this really required at the top level? or can this be moved to SurfaceLayer? const geojson::GeoJSON catchment_data; - long output_time_index; + long output_time_index; + + // Serialization template will be defined and instantiated in the .cpp file + friend class boost::serialization::access; + template + void serialize(Archive& ar, const unsigned int version) { + ar & this->simulation_time; + } }; } diff --git a/include/core/NgenSimulation.hpp b/include/core/NgenSimulation.hpp index 3189045edc..7089844234 100644 --- a/include/core/NgenSimulation.hpp +++ b/include/core/NgenSimulation.hpp @@ -24,6 +24,8 @@ class State_Snapshot_Loader; #include #include +#include + // Contains all of the dynamic state and logic to run a NextGen hydrologic simulation class NgenSimulation { @@ -55,6 +57,9 @@ class NgenSimulation */ void run_catchments(); + // Alternative method of running catchments that includes saving checkpoints at a desired frequency. + void run_catchments(std::shared_ptr checkpoint_saver, int frequency); + // Tear down of any items stored on the NgenSimulation object that could throw errors and, thus, should be kept separate from the deconstructor. void finalize(); @@ -69,8 +74,6 @@ class NgenSimulation size_t get_num_output_times() const; std::string get_timestamp_for_step(int step) const; - void save_state_snapshot(std::shared_ptr snapshot_saver); - void load_state_snapshot(std::shared_ptr snapshot_loader); /** * Saves a snapshot state that's intended to be run at the end of a simulation. * @@ -79,10 +82,16 @@ class NgenSimulation void save_end_of_run(std::shared_ptr snapshot_saver); // Load a snapshot of the end of a previous run. This will create a T-Route python adapter if the loader finds a unit for it and the config path is not empty. void load_hot_start(std::shared_ptr snapshot_loader, const std::string &t_route_config_file_with_path); + // Load a snapshot of a checkpoint from a previous run. This will create a T-Route python adapter if the loader finds a unit for it (most likely to happen if the checkpoint is derived from ) + void load_checkpoint(std::shared_ptr checkpoint_loader); + + std::vector required_checkpoint_units() const; private: void advance_models_one_output_step(); + void save_checkpoint(std::shared_ptr snapshot_saver); + int simulation_step_; std::shared_ptr sim_time_; @@ -106,8 +115,9 @@ class NgenSimulation int mpi_num_procs_; // Serialization template will be defined and instantiated in the .cpp file + friend class boost::serialization::access; template - void serialize(Archive& ar); + void serialize(Archive& ar, const unsigned int version); }; #endif diff --git a/include/realizations/catchment/Bmi_Formulation.hpp b/include/realizations/catchment/Bmi_Formulation.hpp index 91236fea42..da6d72bc29 100644 --- a/include/realizations/catchment/Bmi_Formulation.hpp +++ b/include/realizations/catchment/Bmi_Formulation.hpp @@ -95,6 +95,11 @@ namespace realization { */ virtual void load_hot_start(std::shared_ptr loader) = 0; + // Unit name used for identifying a save state ID + inline std::string save_state_unit_name() const { + return this->get_id(); + } + /** * Convert a time value from the model to an epoch time in seconds. * diff --git a/include/simulation_time/Simulation_Time.hpp b/include/simulation_time/Simulation_Time.hpp index d2ea4969d4..0c44426e16 100644 --- a/include/simulation_time/Simulation_Time.hpp +++ b/include/simulation_time/Simulation_Time.hpp @@ -8,6 +8,8 @@ #include #include +#include + /** * @brief simulation_time_params providing configuration information for simulation time period. */ @@ -168,6 +170,12 @@ class Simulation_Time private: + friend class boost::serialization::access; + template + void serialize(Archive& ar, const unsigned int version) { + ar & this->current_date_time_epoch; + } + int total_output_times; int simulation_total_time_seconds; int output_interval_seconds; diff --git a/include/state_save_restore/File_Per_Unit.hpp b/include/state_save_restore/File_Per_Unit.hpp index faec8d966a..aa95450915 100644 --- a/include/state_save_restore/File_Per_Unit.hpp +++ b/include/state_save_restore/File_Per_Unit.hpp @@ -11,7 +11,7 @@ class File_Per_Unit_Saver : public State_Saver std::shared_ptr initialize_snapshot(State_Durability durability) override; - std::shared_ptr initialize_checkpoint_snapshot(snapshot_time_t epoch, State_Durability durability) override; + std::shared_ptr initialize_checkpoint_snapshot(int step, State_Durability durability) override; void finalize() override; @@ -30,7 +30,7 @@ class File_Per_Unit_Loader : public State_Loader std::shared_ptr initialize_snapshot() override; - std::shared_ptr initialize_checkpoint_snapshot(State_Saver::snapshot_time_t epoch) override; + std::shared_ptr initialize_checkpoint_snapshot(const std::vector &required_units, int *const checkpoint_step) override; private: std::string dir_path_; }; diff --git a/include/state_save_restore/State_Save_Restore.hpp b/include/state_save_restore/State_Save_Restore.hpp index 57a88b5f3e..5bb13677af 100644 --- a/include/state_save_restore/State_Save_Restore.hpp +++ b/include/state_save_restore/State_Save_Restore.hpp @@ -5,6 +5,9 @@ #include #include +#include +#include +#include #include #include @@ -12,6 +15,7 @@ #include #include +#include "vecbuf.hpp" #include "State_Save_Utils.hpp" class State_Saver; @@ -49,15 +53,22 @@ class State_Save_Config */ std::unique_ptr hot_start() const; + std::unique_ptr checkpoint_loader() const; + + bool has_checkpoint_saver() const; + + std::shared_ptr checkpoint_saver(int *const frequency) const; + struct instance { - instance(std::string const& direction, std::string const& label, std::string const& path, std::string const& mechanism, std::string const& timing); + instance(std::string const& direction, std::string const& label, std::string const& path, std::string const& mechanism, std::string const& timing, boost::optional frequency); State_Save_Direction direction_; State_Save_Mechanism mechanism_; State_Save_When timing_; std::string label_; std::string path_; + int frequency_; std::string mechanism_string() const; }; @@ -69,8 +80,6 @@ class State_Save_Config class State_Saver { public: - using snapshot_time_t = std::chrono::time_point; - // Flag type to indicate whether state saving needs to ensure // stability of saved data wherever it is stored before returning // success @@ -79,8 +88,6 @@ class State_Saver State_Saver() = default; virtual ~State_Saver() = default; - static snapshot_time_t snapshot_time_now(); - /** * Return an object suitable for saving a simulation state as of a * particular moment in time, @param epoch @@ -91,7 +98,7 @@ class State_Saver */ virtual std::shared_ptr initialize_snapshot(State_Durability durability) = 0; - virtual std::shared_ptr initialize_checkpoint_snapshot(snapshot_time_t epoch, State_Durability durability) = 0; + virtual std::shared_ptr initialize_checkpoint_snapshot(int step, State_Durability durability) = 0; /** * Execute any logic necessary to cleanly finish usage, and @@ -114,6 +121,18 @@ class State_Snapshot_Saver */ virtual void save_unit(std::string const& unit_name, boost::span data) = 0; + /** + * Capture the data from a single unit that can be serialized with a Boost binary_oarchive + */ + template + void archive_unit(const std::string &unit_name, T *item) { + vecbuf buffer; + boost::archive::binary_oarchive archive(buffer); + archive << (*item); + boost::span data(buffer.data(), buffer.size()); + this->save_unit(unit_name, data); + } + /** * Execute logic to complete the saving process * @@ -142,7 +161,13 @@ class State_Loader */ virtual std::shared_ptr initialize_snapshot() = 0; - virtual std::shared_ptr initialize_checkpoint_snapshot(State_Saver::snapshot_time_t epoch) = 0; + /** + * Return an object suitable for loading a checkpoint simulation state with all the required units. + * + * @param required_units Vector of all units that are required for a simulation state to be considered complete. + * @param checkpoint_step Pointer that will be set to the step number of the valid state. + */ + virtual std::shared_ptr initialize_checkpoint_snapshot(const std::vector &required_units, int *const checkpoint_step) = 0; /** * Execute any logic necessary to cleanly finish usage, and @@ -169,6 +194,18 @@ class State_Snapshot_Loader */ virtual void load_unit(const std::string &unit_name, std::vector &data) = 0; + /** + * Load unit data and immediately dearchive it with a Boost binary_iarchive + */ + template + void dearchive_unit(const std::string &unit_name, T *item) { + std::vector buffer; + this->load_unit(unit_name, buffer); + membuf data(buffer.data(), buffer.size()); + boost::archive::binary_iarchive archive(data); + archive >> (*item); + } + /** * Execute logic to complete the saving process * diff --git a/include/state_save_restore/State_Save_Utils.hpp b/include/state_save_restore/State_Save_Utils.hpp index 9713b660af..c3b6064db8 100644 --- a/include/state_save_restore/State_Save_Utils.hpp +++ b/include/state_save_restore/State_Save_Utils.hpp @@ -23,8 +23,8 @@ enum class State_Save_Mechanism { enum class State_Save_When { None = 0, EndOfRun, - FirstOfMonth, - StartOfRun + StartOfRun, + Checkpoint }; #endif diff --git a/src/NGen.cpp b/src/NGen.cpp index b4b8d14978..0c277b7c0d 100644 --- a/src/NGen.cpp +++ b/src/NGen.cpp @@ -710,7 +710,25 @@ int run_ngen(int argc, char* argv[], int mpi_num_procs, int mpi_rank) { } } - simulation->run_catchments(); + int checkpoint_step = 0; + { // optionally load a checkpoint if configured + auto checkpoint_loader = state_saving_config.checkpoint_loader(); + if (checkpoint_loader) { + LOG(LogLevel::INFO, "Loading checkpoint data from prior snapshot."); + const std::vector required_units = simulation->required_checkpoint_units(); + std::shared_ptr snapshot_loader + = checkpoint_loader->initialize_checkpoint_snapshot(required_units, &checkpoint_step); + simulation->load_checkpoint(snapshot_loader); + } + } + + if (state_saving_config.has_checkpoint_saver()) { + int checkpoint_frequency; + std::shared_ptr checkpoint_saver = state_saving_config.checkpoint_saver(&checkpoint_frequency); + simulation->run_catchments(checkpoint_saver, checkpoint_frequency); + } else { + simulation->run_catchments(); + } #if NGEN_WITH_MPI MPI_Barrier(MPI_COMM_WORLD); diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index 8793567a65..5b42bf9181 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -9,6 +9,7 @@ target_link_libraries(core PUBLIC target_link_libraries(core PRIVATE NGen::parallel + NGen::state_save_restore ) target_link_libraries(core PUBLIC diff --git a/src/core/Layer.cpp b/src/core/Layer.cpp index 432b918aa3..97d5cedcf3 100644 --- a/src/core/Layer.cpp +++ b/src/core/Layer.cpp @@ -94,10 +94,30 @@ void ngen::Layer::update_models(boost::span catchment_outflows, } } -void ngen::Layer::save_state_snapshot(std::shared_ptr snapshot_saver) +std::string ngen::Layer::unit_name() const { + return "lyr-" + std::to_string(this->get_id()); +} + +std::vector ngen::Layer::required_checkpoint_units() const { + std::vector units; + units.push_back(this->unit_name()); + for (auto const& id : processing_units) { + auto r = features.catchment_at(id); + auto r_c = std::dynamic_pointer_cast(r); + units.push_back(r_c->save_state_unit_name()); + } + return units; +} + +void ngen::Layer::save_checkpoint(std::shared_ptr snapshot_saver) { - // XXX Handle any of this class's own state as a meta-data unit + snapshot_saver->archive_unit(this->unit_name(), this); + // assume save_end_of_run ist just the BMI save + this->save_end_of_run(snapshot_saver); +} +void ngen::Layer::save_end_of_run(std::shared_ptr snapshot_saver) +{ for (auto const& id : processing_units) { auto r = features.catchment_at(id); auto r_c = std::dynamic_pointer_cast(r); @@ -105,10 +125,9 @@ void ngen::Layer::save_state_snapshot(std::shared_ptr snap } } -void ngen::Layer::load_state_snapshot(std::shared_ptr snapshot_loader) +void ngen::Layer::load_checkpoint(std::shared_ptr snapshot_loader) { - // XXX Handle any of this class's own state as a meta-data unit - + snapshot_loader->dearchive_unit(this->unit_name(), this); for (auto const& id : processing_units) { auto r = features.catchment_at(id); auto r_c = std::dynamic_pointer_cast(r); @@ -118,8 +137,7 @@ void ngen::Layer::load_state_snapshot(std::shared_ptr sna void ngen::Layer::load_hot_start(std::shared_ptr snapshot_loader) { - // XXX Handle any of this class's own state as a meta-data unit - + // no Layer metadata needs to load for this for (auto const& id : processing_units) { auto r = features.catchment_at(id); auto r_c = std::dynamic_pointer_cast(r); diff --git a/src/core/NgenSimulation.cpp b/src/core/NgenSimulation.cpp index 9addca80d4..156a252b2d 100644 --- a/src/core/NgenSimulation.cpp +++ b/src/core/NgenSimulation.cpp @@ -8,8 +8,16 @@ #include "HY_Features.hpp" #endif +#include "state_save_restore/vecbuf.hpp" #include "state_save_restore/State_Save_Utils.hpp" #include "state_save_restore/State_Save_Restore.hpp" + +#include +#include +#include +#include +#include + #include "parallel_utils.h" namespace { @@ -57,6 +65,27 @@ void NgenSimulation::run_catchments() } } +void NgenSimulation::run_catchments(std::shared_ptr checkpoint_saver, int frequency) { + int num_times = get_num_output_times(); + + for (; simulation_step_ < num_times; simulation_step_++) { + // Make room for this output step's results + catchment_outflows_.resize(catchment_outflows_.size() + catchment_indexes_.size(), 0.0); + nexus_downstream_flows_.resize(nexus_downstream_flows_.size() + nexus_indexes_.size(), 0.0); + + advance_models_one_output_step(); + + if (simulation_step_ + 1 < num_times) { + sim_time_->advance_timestep(); + } + + if (simulation_step_ != 0 && simulation_step_ % frequency == 0) { + auto step_saver = checkpoint_saver->initialize_checkpoint_snapshot(simulation_step_, State_Saver::State_Durability::strict); + this->save_checkpoint(step_saver); + } + } +} + void NgenSimulation::finalize() { #if NGEN_WITH_ROUTING if (this->py_troute_) { @@ -120,20 +149,10 @@ void NgenSimulation::advance_models_one_output_step() } -void NgenSimulation::save_state_snapshot(std::shared_ptr snapshot_saver) -{ - // TODO: save the current nexus data - auto unit_name = this->unit_name(); - // XXX Handle self, then recursively pass responsibility to Layers - for (auto& layer : layers_) { - layer->save_state_snapshot(snapshot_saver); - } -} - void NgenSimulation::save_end_of_run(std::shared_ptr snapshot_saver) { for (auto& layer : layers_) { - layer->save_state_snapshot(snapshot_saver); + layer->save_end_of_run(snapshot_saver); } #if NGEN_WITH_ROUTING if (this->mpi_rank_ == 0 && this->py_troute_) { @@ -148,11 +167,21 @@ void NgenSimulation::save_end_of_run(std::shared_ptr snaps #endif // NGEN_WITH_ROUTING } -void NgenSimulation::load_state_snapshot(std::shared_ptr snapshot_loader) { - // TODO: load the state data related to nexus outflows - auto unit_name = this->unit_name(); +void NgenSimulation::save_checkpoint(std::shared_ptr snapshot_saver) { +#if NGEN_WITH_MPI + // Remote data is currently handled by MPI, so saving a checkpoint without retreiving all messages could lose data. + // An example would be checkpointing every 100 steps with two ranks. Rank 1 is faster than Rank 0, so it saves + // step 200 whilst Rank 1 has only saved step 100. If checkpoints are loaded, all data in MPI messages from Rank 1 + // for steps 100-199 will be lost, and the program will likely hang as Rank 0 waits for those messages from Rank 1. + // For now, set up a barrier is set up to make sure all ranks can catch up before checkpointing. + // A possible later improvement would be the ability to store and recreate the MPI messages when saving/loading checkpoints. + if (this->mpi_num_procs_ > 1) { + MPI_Barrier(MPI_COMM_WORLD); + } +#endif + snapshot_saver->archive_unit(this->unit_name(), this); for (auto& layer : layers_) { - layer->load_state_snapshot(snapshot_loader); + layer->save_checkpoint(snapshot_saver); } } @@ -185,6 +214,24 @@ void NgenSimulation::load_hot_start(std::shared_ptr snaps #endif // NGEN_WITH_ROUTING } +void NgenSimulation::load_checkpoint(std::shared_ptr checkpoint_loader) { + checkpoint_loader->dearchive_unit(this->unit_name(), this); + for (auto& layer : layers_) { + layer->load_checkpoint(checkpoint_loader); + } +} + +std::vector NgenSimulation::required_checkpoint_units() const { + std::vector units; + units.push_back(this->unit_name()); + for (const auto &layer : this->layers_) { + const auto layer_units = layer->required_checkpoint_units(); + for (const auto &unit : layer_units) + units.push_back(unit); + } + return units; +} + void NgenSimulation::make_troute(const std::string &t_route_config_file_with_path) { #if NGEN_WITH_ROUTING @@ -301,7 +348,7 @@ void NgenSimulation::run_routing(NgenSimulation::hy_features_t &features, std::s // case a future implementation needs these two values from the ngen framework. int delta_time = sim_time_->get_output_interval_seconds(); - // model for routing + // if the t-route model was not created from a hot start load, make it now if (this->py_troute_ == NULL) { this->make_troute(t_route_config_file_with_path); } @@ -346,16 +393,14 @@ std::string NgenSimulation::get_timestamp_for_step(int step) const } template -void NgenSimulation::serialize(Archive& ar) { +void NgenSimulation::serialize(Archive& ar, const unsigned int version) { /* Handle `catchment_formulation_manager` specially in the * overall checkpoint/restart logic, so that we can subset * individual catchments and do other tricky things */ //ar & catchment_formulation_manager ar & simulation_step_; - ar & sim_time_; - - // Layers will be reconstructed, but their internal time keeping needs to be serialized + ar & (*sim_time_); // Nexus and catchment indexes could be re-generated, but only if // the set of catchments remains consistent diff --git a/src/realizations/catchment/Bmi_Module_Formulation.cpp b/src/realizations/catchment/Bmi_Module_Formulation.cpp index 359a257f88..97a5c1eedf 100644 --- a/src/realizations/catchment/Bmi_Module_Formulation.cpp +++ b/src/realizations/catchment/Bmi_Module_Formulation.cpp @@ -25,14 +25,14 @@ namespace realization { // Rely on Formulation_Manager also using this->get_id() // as a unique key for the individual catchment // formulations - saver->save_unit(this->get_id(), data); + saver->save_unit(this->save_state_unit_name(), data); this->free_serialization_state(); } void Bmi_Module_Formulation::load_state(std::shared_ptr loader) { std::vector buffer; - loader->load_unit(this->get_id(), buffer); + loader->load_unit(this->save_state_unit_name(), buffer); boost::span data(buffer.data(), buffer.size()); this->load_serialization_state(data); } diff --git a/src/realizations/catchment/Bmi_Multi_Formulation.cpp b/src/realizations/catchment/Bmi_Multi_Formulation.cpp index ed5b296193..bb6b8a627a 100644 --- a/src/realizations/catchment/Bmi_Multi_Formulation.cpp +++ b/src/realizations/catchment/Bmi_Multi_Formulation.cpp @@ -26,7 +26,7 @@ using namespace realization; void Bmi_Multi_Formulation::save_state(std::shared_ptr saver) { - LOG(LogLevel::DEBUG, "Saving state for Multi-BMI %s", this->get_id()); + LOG(LogLevel::DEBUG, "Saving state for Multi-BMI %s", this->save_state_unit_name()); vecbuf data; boost::archive::binary_oarchive archive(data); // serialization function handles freeing the sub-BMI states after archiving them @@ -38,13 +38,13 @@ void Bmi_Multi_Formulation::save_state(std::shared_ptr sav bmi->free_serialization_state(); } boost::span span(data.data(), data.size()); - saver->save_unit(this->get_id(), span); + saver->save_unit(this->save_state_unit_name(), span); } void Bmi_Multi_Formulation::load_state(std::shared_ptr loader) { - LOG(LogLevel::DEBUG, "Loading save state for Multi-BMI %s", this->get_id()); + LOG(LogLevel::DEBUG, "Loading save state for Multi-BMI %s", this->save_state_unit_name()); std::vector data; - loader->load_unit(this->get_id(), data); + loader->load_unit(this->save_state_unit_name(), data); membuf stream(data.data(), data.size()); boost::archive::binary_iarchive archive(stream); archive >> (*this); diff --git a/src/state_save_restore/File_Per_Unit.cpp b/src/state_save_restore/File_Per_Unit.cpp index 6d355d4c93..a6b3e4fa6e 100644 --- a/src/state_save_restore/File_Per_Unit.cpp +++ b/src/state_save_restore/File_Per_Unit.cpp @@ -20,17 +20,6 @@ #include #include -namespace unit_saving_utils { - std::string format_epoch(State_Saver::snapshot_time_t epoch) - { - time_t t = std::chrono::system_clock::to_time_t(epoch); - std::tm tm = *std::gmtime(&t); - - std::stringstream tss; - tss << std::put_time(&tm, "%Y-%m-%dT%H:%M:%S"); - return tss.str(); - } -} // This class is only declared and defined here, in the .cpp file, // because it is strictly an implementation detail of the top-level @@ -66,9 +55,9 @@ std::shared_ptr File_Per_Unit_Saver::initialize_snapshot(S return std::make_shared(path(this->base_path_), durability); } -std::shared_ptr File_Per_Unit_Saver::initialize_checkpoint_snapshot(snapshot_time_t epoch, State_Durability durability) +std::shared_ptr File_Per_Unit_Saver::initialize_checkpoint_snapshot(int step, State_Durability durability) { - path checkpoint_path = path(this->base_path_) / unit_saving_utils::format_epoch(epoch); + path checkpoint_path = path(this->base_path_) / std::to_string(step); create_directory(checkpoint_path); return std::make_shared(checkpoint_path, durability); } @@ -188,9 +177,36 @@ std::shared_ptr File_Per_Unit_Loader::initialize_snapshot return std::make_shared(path(dir_path_)); } -std::shared_ptr File_Per_Unit_Loader::initialize_checkpoint_snapshot(State_Saver::snapshot_time_t epoch) +std::shared_ptr File_Per_Unit_Loader::initialize_checkpoint_snapshot(const std::vector &required_units, int *const checkpoint_step) { - path checkpoint_path = path(dir_path_) / unit_saving_utils::format_epoch(epoch);; - return std::make_shared(checkpoint_path); + std::vector options; + for (const auto &subdir : directory_iterator(dir_path_)) { + path subdir_path = subdir.path(); + // make sure subfolder is a number from a timestep + if (subdir_path.filename().string().find_first_not_of("0123456789") == std::string::npos) { + options.push_back(subdir); + } + } + // sort options by the highest number representation + std::sort(options.begin(), options.end(), [](const path &a, const path &b) { + return std::stoi(a.filename().string()) > std::stoi(b.filename().string()); + }); + for (const path &option : options) { + auto loader = std::make_shared(option); + bool passes = true; + for (const auto &unit : required_units) { + if (!loader->has_unit(unit)) { + passes = false; + break; + } + } + if (passes) { + *checkpoint_step = std::stoi(option.filename().string()); + return loader; + } + } + std::string error = "No checkpoint location found with all required units in root directory " + this->dir_path_; + LOG(LogLevel::FATAL, error); + throw std::runtime_error(error); } diff --git a/src/state_save_restore/State_Save_Restore.cpp b/src/state_save_restore/State_Save_Restore.cpp index 4a54b1a21c..7486936979 100644 --- a/src/state_save_restore/State_Save_Restore.cpp +++ b/src/state_save_restore/State_Save_Restore.cpp @@ -1,5 +1,6 @@ #include #include +#include #include "ewts_ngen/logger.hpp" @@ -8,6 +9,9 @@ #include +#include +#include + State_Save_Config::State_Save_Config(boost::property_tree::ptree const& tree) { auto maybe = tree.get_child_optional("state_saving"); @@ -19,6 +23,8 @@ State_Save_Config::State_Save_Config(boost::property_tree::ptree const& tree) } bool hot_start = false; + bool checkpoint_load = false; + bool checkpoint_save = false; for (const auto& saving_config : *maybe) { try { auto& subtree = saving_config.second; @@ -27,13 +33,25 @@ State_Save_Config::State_Save_Config(boost::property_tree::ptree const& tree) auto where = subtree.get("path"); auto how = subtree.get("type"); auto when = subtree.get("when"); + auto frequency = subtree.get_optional("frequency"); - instance i{direction, what, where, how, when}; + instance i{direction, what, where, how, when, frequency}; if (i.timing_ == State_Save_When::StartOfRun && i.direction_ == State_Save_Direction::Load) { if (hot_start) throw std::runtime_error("Only one hot start state saving configuration is allowed."); hot_start = true; } + if (i.timing_ == State_Save_When::Checkpoint) { + if (i.direction_ == State_Save_Direction::Load) { + if (checkpoint_load) + throw std::runtime_error("Only one checkpointing load state saving configuration is allowed."); + checkpoint_load = true; + } else if (i.direction_ == State_Save_Direction::Save) { + if (checkpoint_save) + throw std::runtime_error("Only one checkpointing save state saving configuration is allowed."); + checkpoint_save = true; + } + } instances_.push_back(i); } catch (std::exception &e) { LOG("Bad state saving config: " + std::string(e.what()), LogLevel::WARNING); @@ -86,12 +104,50 @@ std::unique_ptr State_Save_Config::hot_start() const { } } } - return std::unique_ptr(); + return NULL; } -State_Save_Config::instance::instance(std::string const& direction, std::string const& label, std::string const& path, std::string const& mechanism, std::string const& timing) +std::unique_ptr State_Save_Config::checkpoint_loader() const { + for (const auto &i : this->instances_) { + if (i.direction_ == State_Save_Direction::Load && i.timing_ == State_Save_When::Checkpoint) { + if (i.mechanism_ == State_Save_Mechanism::FilePerUnit) { + return std::make_unique(i.path_); + } else { + Logger::LogAndThrow("State_Save_Config: Loading mechanism " + i.mechanism_string() + " is not supported for checkpoint loading."); + } + } + } + return NULL; +} + +bool State_Save_Config::has_checkpoint_saver() const { + for (const auto &i : this->instances_) { + if (i.direction_ == State_Save_Direction::Save && i.timing_ == State_Save_When::Checkpoint) { + return true; + } + } + return false; +} + +std::shared_ptr State_Save_Config::checkpoint_saver(int *const frequency) const { + for (const auto &i : this->instances_) { + if (i.direction_ == State_Save_Direction::Save && i.timing_ == State_Save_When::Checkpoint) { + if (i.mechanism_ == State_Save_Mechanism::FilePerUnit) { + *frequency = i.frequency_; + return std::make_shared(i.path_); + } else { + Logger::LogAndThrow("State_Save_Config: Saving mechanism " + i.mechanism_string() + " is not supported for checkpoint saving."); + } + } + } + Logger::LogAndThrow("State_Save_Config: Failed to find a suitable checkpoint saving configuration."); + return NULL; +} + +State_Save_Config::instance::instance(std::string const& direction, std::string const& label, std::string const& path, std::string const& mechanism, std::string const& timing, boost::optional frequency) : label_(label) , path_(path) + , frequency_{-1} { if (direction == "save") { direction_ = State_Save_Direction::Save; @@ -115,10 +171,22 @@ State_Save_Config::instance::instance(std::string const& direction, std::string if (timing == "EndOfRun") { timing_ = State_Save_When::EndOfRun; - } else if (timing == "FirstOfMonth") { - timing_ = State_Save_When::FirstOfMonth; } else if (timing == "StartOfRun") { timing_ = State_Save_When::StartOfRun; + } else if (timing == "Checkpoint") { // starts with "Checkpoint" + timing_ = State_Save_When::Checkpoint; + if (direction_ == State_Save_Direction::Save) { + if (!frequency.has_value()) { + Logger::LogAndThrow("The checkpoint save configuration is missing a 'frequency' of checkpointing."); + } + frequency_ = frequency.get(); + // make sure the frequency makes sense + if (frequency_ <= 0) { + std::stringstream ss; + ss << "The frequency of a checkpoint save must be greater than 0. The configured value is " << frequency_; + Logger::LogAndThrow(ss.str()); + } + } } else { std::string message = "Unrecognized state saving timing '" + timing + "'"; std::string throw_msg; throw_msg.assign(message); @@ -143,12 +211,3 @@ State_Snapshot_Saver::State_Snapshot_Saver(State_Saver::State_Durability durabil { } - -State_Saver::snapshot_time_t State_Saver::snapshot_time_now() { -#if __cplusplus < 201703L // C++ < 17 - auto now = std::chrono::system_clock::now(); - return std::chrono::time_point_cast(now); -#else - return std::chrono::floor(std::chrono::system_clock::now()); -#endif -} From 29f211fdba4f3add6f7623057378183256a2cbdf Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Thu, 2 Apr 2026 07:54:00 -0400 Subject: [PATCH 2/7] Delete state data at least two checkpoints old --- include/core/NgenSimulation.hpp | 4 +- include/state_save_restore/File_Per_Unit.hpp | 4 +- .../state_save_restore/State_Save_Restore.hpp | 8 ++- src/NGen.cpp | 3 +- src/core/NgenSimulation.cpp | 23 ++++----- src/state_save_restore/File_Per_Unit.cpp | 49 +++++++++++++------ 6 files changed, 60 insertions(+), 31 deletions(-) diff --git a/include/core/NgenSimulation.hpp b/include/core/NgenSimulation.hpp index 7089844234..5d4ebcf8b4 100644 --- a/include/core/NgenSimulation.hpp +++ b/include/core/NgenSimulation.hpp @@ -84,14 +84,14 @@ class NgenSimulation void load_hot_start(std::shared_ptr snapshot_loader, const std::string &t_route_config_file_with_path); // Load a snapshot of a checkpoint from a previous run. This will create a T-Route python adapter if the loader finds a unit for it (most likely to happen if the checkpoint is derived from ) void load_checkpoint(std::shared_ptr checkpoint_loader); + // Save a snapshot that preserves data related to the current time and could be loaded to jump a simulation into the middle of a run. + void save_checkpoint(std::shared_ptr snapshot_saver); std::vector required_checkpoint_units() const; private: void advance_models_one_output_step(); - void save_checkpoint(std::shared_ptr snapshot_saver); - int simulation_step_; std::shared_ptr sim_time_; diff --git a/include/state_save_restore/File_Per_Unit.hpp b/include/state_save_restore/File_Per_Unit.hpp index aa95450915..a3ad8fb66f 100644 --- a/include/state_save_restore/File_Per_Unit.hpp +++ b/include/state_save_restore/File_Per_Unit.hpp @@ -13,6 +13,8 @@ class File_Per_Unit_Saver : public State_Saver std::shared_ptr initialize_checkpoint_snapshot(int step, State_Durability durability) override; + void clear_cache(int mpi_rank) override; + void finalize() override; private: @@ -30,7 +32,7 @@ class File_Per_Unit_Loader : public State_Loader std::shared_ptr initialize_snapshot() override; - std::shared_ptr initialize_checkpoint_snapshot(const std::vector &required_units, int *const checkpoint_step) override; + std::shared_ptr initialize_checkpoint_snapshot(const std::vector &required_units) override; private: std::string dir_path_; }; diff --git a/include/state_save_restore/State_Save_Restore.hpp b/include/state_save_restore/State_Save_Restore.hpp index 5bb13677af..4dfb3531c6 100644 --- a/include/state_save_restore/State_Save_Restore.hpp +++ b/include/state_save_restore/State_Save_Restore.hpp @@ -100,6 +100,12 @@ class State_Saver virtual std::shared_ptr initialize_checkpoint_snapshot(int step, State_Durability durability) = 0; + /** Clear unneeded data that may be generated over the lifetime of a State_Saver's lifetime + * + * @param mpi_rank The process' MPI rank that may be used for determining responsibility of cleanup. + */ + virtual void clear_cache(int mpi_rank) = 0; + /** * Execute any logic necessary to cleanly finish usage, and * potentially report errors, before destructors would @@ -167,7 +173,7 @@ class State_Loader * @param required_units Vector of all units that are required for a simulation state to be considered complete. * @param checkpoint_step Pointer that will be set to the step number of the valid state. */ - virtual std::shared_ptr initialize_checkpoint_snapshot(const std::vector &required_units, int *const checkpoint_step) = 0; + virtual std::shared_ptr initialize_checkpoint_snapshot(const std::vector &required_units) = 0; /** * Execute any logic necessary to cleanly finish usage, and diff --git a/src/NGen.cpp b/src/NGen.cpp index 0c277b7c0d..4972330483 100644 --- a/src/NGen.cpp +++ b/src/NGen.cpp @@ -710,14 +710,13 @@ int run_ngen(int argc, char* argv[], int mpi_num_procs, int mpi_rank) { } } - int checkpoint_step = 0; { // optionally load a checkpoint if configured auto checkpoint_loader = state_saving_config.checkpoint_loader(); if (checkpoint_loader) { LOG(LogLevel::INFO, "Loading checkpoint data from prior snapshot."); const std::vector required_units = simulation->required_checkpoint_units(); std::shared_ptr snapshot_loader - = checkpoint_loader->initialize_checkpoint_snapshot(required_units, &checkpoint_step); + = checkpoint_loader->initialize_checkpoint_snapshot(required_units); simulation->load_checkpoint(snapshot_loader); } } diff --git a/src/core/NgenSimulation.cpp b/src/core/NgenSimulation.cpp index 156a252b2d..afb92cbe7e 100644 --- a/src/core/NgenSimulation.cpp +++ b/src/core/NgenSimulation.cpp @@ -80,6 +80,18 @@ void NgenSimulation::run_catchments(std::shared_ptr checkpoint_save } if (simulation_step_ != 0 && simulation_step_ % frequency == 0) { +#if NGEN_WITH_MPI + // Remote data is currently handled by MPI, so saving a checkpoint without retreiving all messages could lose data. + // An example would be checkpointing every 100 steps with two ranks. Rank 1 is faster than Rank 0, so it saves + // step 200 whilst Rank 1 has only saved step 100. If checkpoints are loaded, all data in MPI messages from Rank 1 + // for steps 100-199 will be lost, and the program will likely hang as Rank 0 waits for those messages from Rank 1. + // For now, set up a barrier to make sure all ranks can catch up before checkpointing. + // A possible later improvement would be the ability to store and recreate the MPI messages when saving/loading checkpoints. + if (this->mpi_num_procs_ > 1) { + MPI_Barrier(MPI_COMM_WORLD); + } +#endif // NGEN_WITH_MPI + checkpoint_saver->clear_cache(this->mpi_rank_); auto step_saver = checkpoint_saver->initialize_checkpoint_snapshot(simulation_step_, State_Saver::State_Durability::strict); this->save_checkpoint(step_saver); } @@ -168,17 +180,6 @@ void NgenSimulation::save_end_of_run(std::shared_ptr snaps } void NgenSimulation::save_checkpoint(std::shared_ptr snapshot_saver) { -#if NGEN_WITH_MPI - // Remote data is currently handled by MPI, so saving a checkpoint without retreiving all messages could lose data. - // An example would be checkpointing every 100 steps with two ranks. Rank 1 is faster than Rank 0, so it saves - // step 200 whilst Rank 1 has only saved step 100. If checkpoints are loaded, all data in MPI messages from Rank 1 - // for steps 100-199 will be lost, and the program will likely hang as Rank 0 waits for those messages from Rank 1. - // For now, set up a barrier is set up to make sure all ranks can catch up before checkpointing. - // A possible later improvement would be the ability to store and recreate the MPI messages when saving/loading checkpoints. - if (this->mpi_num_procs_ > 1) { - MPI_Barrier(MPI_COMM_WORLD); - } -#endif snapshot_saver->archive_unit(this->unit_name(), this); for (auto& layer : layers_) { layer->save_checkpoint(snapshot_saver); diff --git a/src/state_save_restore/File_Per_Unit.cpp b/src/state_save_restore/File_Per_Unit.cpp index a6b3e4fa6e..28c7600e82 100644 --- a/src/state_save_restore/File_Per_Unit.cpp +++ b/src/state_save_restore/File_Per_Unit.cpp @@ -17,9 +17,30 @@ #error "No Filesystem library implementation available" #endif +#include +#include +#include + #include #include +namespace { + // Populate a vector of paths with subfolders with names that can be interpreted as ints. + // The vector will be sorted by highest numeric representation first. + void ordered_checkpoint_subfolders(const std::string &root, std::vector &subdirs) { + for (const auto &subdir : directory_iterator(root)) { + path subdir_path = subdir.path(); + // make sure subfolder is a number from a timestep + if (subdir_path.filename().string().find_first_not_of("0123456789") == std::string::npos) { + subdirs.push_back(subdir); + } + } + // sort options by the highest number representation + std::sort(subdirs.begin(), subdirs.end(), [](const path &a, const path &b) { + return std::stoi(a.filename().string()) > std::stoi(b.filename().string()); + }); + } +} // This class is only declared and defined here, in the .cpp file, // because it is strictly an implementation detail of the top-level @@ -51,7 +72,6 @@ File_Per_Unit_Saver::File_Per_Unit_Saver(std::string base_path) File_Per_Unit_Saver::~File_Per_Unit_Saver() = default; std::shared_ptr File_Per_Unit_Saver::initialize_snapshot(State_Durability durability) { - // TODO return std::make_shared(path(this->base_path_), durability); } @@ -62,6 +82,17 @@ std::shared_ptr File_Per_Unit_Saver::initialize_checkpoint return std::make_shared(checkpoint_path, durability); } +void File_Per_Unit_Saver::clear_cache(int mpi_rank) { + if (mpi_rank == 0) { // reserve file system deletion to just the main MPI rank + std::vector subdirs; + ordered_checkpoint_subfolders(this->base_path_, subdirs); + // delete all checkpoint directories save the most recent one + for (int i = 1; i < subdirs.size(); ++i) { + remove_all(subdirs[i]); + } + } +} + void File_Per_Unit_Saver::finalize() { // nothing to be done @@ -177,20 +208,10 @@ std::shared_ptr File_Per_Unit_Loader::initialize_snapshot return std::make_shared(path(dir_path_)); } -std::shared_ptr File_Per_Unit_Loader::initialize_checkpoint_snapshot(const std::vector &required_units, int *const checkpoint_step) +std::shared_ptr File_Per_Unit_Loader::initialize_checkpoint_snapshot(const std::vector &required_units) { std::vector options; - for (const auto &subdir : directory_iterator(dir_path_)) { - path subdir_path = subdir.path(); - // make sure subfolder is a number from a timestep - if (subdir_path.filename().string().find_first_not_of("0123456789") == std::string::npos) { - options.push_back(subdir); - } - } - // sort options by the highest number representation - std::sort(options.begin(), options.end(), [](const path &a, const path &b) { - return std::stoi(a.filename().string()) > std::stoi(b.filename().string()); - }); + ordered_checkpoint_subfolders(this->dir_path_, options); for (const path &option : options) { auto loader = std::make_shared(option); bool passes = true; @@ -201,7 +222,7 @@ std::shared_ptr File_Per_Unit_Loader::initialize_checkpoi } } if (passes) { - *checkpoint_step = std::stoi(option.filename().string()); + LOG(LogLevel::INFO, "Loading state from checkpoint step " + option.filename().string()); return loader; } } From ee7df47ca365f2b91cfcddfcba0d5c56e8e9f880 Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Thu, 2 Apr 2026 12:17:45 -0400 Subject: [PATCH 3/7] Double sync processes to prevent deleting good checkpoint folder --- include/core/NgenSimulation.hpp | 2 ++ src/core/NgenSimulation.cpp | 14 +++++++++----- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/include/core/NgenSimulation.hpp b/include/core/NgenSimulation.hpp index 5d4ebcf8b4..90e025dbf2 100644 --- a/include/core/NgenSimulation.hpp +++ b/include/core/NgenSimulation.hpp @@ -92,6 +92,8 @@ class NgenSimulation private: void advance_models_one_output_step(); + inline void sync_mpi_ranks() const; + int simulation_step_; std::shared_ptr sim_time_; diff --git a/src/core/NgenSimulation.cpp b/src/core/NgenSimulation.cpp index afb92cbe7e..34e526f8d6 100644 --- a/src/core/NgenSimulation.cpp +++ b/src/core/NgenSimulation.cpp @@ -80,24 +80,28 @@ void NgenSimulation::run_catchments(std::shared_ptr checkpoint_save } if (simulation_step_ != 0 && simulation_step_ % frequency == 0) { -#if NGEN_WITH_MPI // Remote data is currently handled by MPI, so saving a checkpoint without retreiving all messages could lose data. // An example would be checkpointing every 100 steps with two ranks. Rank 1 is faster than Rank 0, so it saves // step 200 whilst Rank 1 has only saved step 100. If checkpoints are loaded, all data in MPI messages from Rank 1 // for steps 100-199 will be lost, and the program will likely hang as Rank 0 waits for those messages from Rank 1. // For now, set up a barrier to make sure all ranks can catch up before checkpointing. // A possible later improvement would be the ability to store and recreate the MPI messages when saving/loading checkpoints. - if (this->mpi_num_procs_ > 1) { - MPI_Barrier(MPI_COMM_WORLD); - } -#endif // NGEN_WITH_MPI + this->sync_mpi_ranks(); checkpoint_saver->clear_cache(this->mpi_rank_); + this->sync_mpi_ranks(); auto step_saver = checkpoint_saver->initialize_checkpoint_snapshot(simulation_step_, State_Saver::State_Durability::strict); this->save_checkpoint(step_saver); } } } +void NgenSimulation::sync_mpi_ranks() const { +#if NGEN_WITH_MPI + if (this->mpi_num_procs_ > 1) + MPI_Barrier(MPI_COMM_WORLD); +#endif // NGEN_WITH_MPI +} + void NgenSimulation::finalize() { #if NGEN_WITH_ROUTING if (this->py_troute_) { From 79da0a30b0da0e6883718cf1fed9c6fb21535356 Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Mon, 6 Apr 2026 10:29:43 -0400 Subject: [PATCH 4/7] Add MPI checkpoint load safety --- include/core/NgenSimulation.hpp | 7 ++- include/state_save_restore/File_Per_Unit.hpp | 2 +- .../state_save_restore/State_Save_Restore.hpp | 4 +- src/NGen.cpp | 2 +- src/core/NgenSimulation.cpp | 49 +++++++++++++------ src/state_save_restore/File_Per_Unit.cpp | 2 +- 6 files changed, 44 insertions(+), 22 deletions(-) diff --git a/include/core/NgenSimulation.hpp b/include/core/NgenSimulation.hpp index 90e025dbf2..d009ccdfde 100644 --- a/include/core/NgenSimulation.hpp +++ b/include/core/NgenSimulation.hpp @@ -87,7 +87,12 @@ class NgenSimulation // Save a snapshot that preserves data related to the current time and could be loaded to jump a simulation into the middle of a run. void save_checkpoint(std::shared_ptr snapshot_saver); - std::vector required_checkpoint_units() const; + /** + * Get a vector of all the required units the NgenSimulation would need to load a checkpoint state. + * + * @param merge_all_ranks Whether to also get the units from other MPI ranks. This should only be `true` when calling blocking MPI processes is safe for the program. + */ + std::vector required_checkpoint_units(bool merge_all_ranks) const; private: void advance_models_one_output_step(); diff --git a/include/state_save_restore/File_Per_Unit.hpp b/include/state_save_restore/File_Per_Unit.hpp index a3ad8fb66f..610f978c87 100644 --- a/include/state_save_restore/File_Per_Unit.hpp +++ b/include/state_save_restore/File_Per_Unit.hpp @@ -13,7 +13,7 @@ class File_Per_Unit_Saver : public State_Saver std::shared_ptr initialize_checkpoint_snapshot(int step, State_Durability durability) override; - void clear_cache(int mpi_rank) override; + void clear_prior(int mpi_rank) override; void finalize() override; diff --git a/include/state_save_restore/State_Save_Restore.hpp b/include/state_save_restore/State_Save_Restore.hpp index 4dfb3531c6..f05b97a41f 100644 --- a/include/state_save_restore/State_Save_Restore.hpp +++ b/include/state_save_restore/State_Save_Restore.hpp @@ -100,11 +100,11 @@ class State_Saver virtual std::shared_ptr initialize_checkpoint_snapshot(int step, State_Durability durability) = 0; - /** Clear unneeded data that may be generated over the lifetime of a State_Saver's lifetime + /** Clear data related to states prior to the last save state associated with this State_Saver * * @param mpi_rank The process' MPI rank that may be used for determining responsibility of cleanup. */ - virtual void clear_cache(int mpi_rank) = 0; + virtual void clear_prior(int mpi_rank) = 0; /** * Execute any logic necessary to cleanly finish usage, and diff --git a/src/NGen.cpp b/src/NGen.cpp index 4972330483..dcd4ea5544 100644 --- a/src/NGen.cpp +++ b/src/NGen.cpp @@ -714,7 +714,7 @@ int run_ngen(int argc, char* argv[], int mpi_num_procs, int mpi_rank) { auto checkpoint_loader = state_saving_config.checkpoint_loader(); if (checkpoint_loader) { LOG(LogLevel::INFO, "Loading checkpoint data from prior snapshot."); - const std::vector required_units = simulation->required_checkpoint_units(); + const std::vector required_units = simulation->required_checkpoint_units(true); std::shared_ptr snapshot_loader = checkpoint_loader->initialize_checkpoint_snapshot(required_units); simulation->load_checkpoint(snapshot_loader); diff --git a/src/core/NgenSimulation.cpp b/src/core/NgenSimulation.cpp index 34e526f8d6..cfa49f2c27 100644 --- a/src/core/NgenSimulation.cpp +++ b/src/core/NgenSimulation.cpp @@ -21,8 +21,26 @@ #include "parallel_utils.h" namespace { - const auto NGEN_UNIT_NAME = "ngen"; const auto TROUTE_UNIT_NAME = "troute"; + + #if NGEN_WITH_MPI + // Merge strings from multiple MPI ranks and broadcast the results to all ranks. Duplicates will be removed. + std::vector collect_unique_strings(const std::vector &items, int mpi_rank, int mpi_num_procs) { + // MPI_Gather all items to rank 0 + std::vector all_items + = parallel::gather_strings(items, mpi_rank, mpi_num_procs); + if (mpi_rank == 0) { + // filter to only the unique items + std::sort(all_items.begin(), all_items.end()); + all_items.erase( + std::unique(all_items.begin(), all_items.end()), + all_items.end() + ); + } + // MPI_Broadcast unique, sorted results to all ranks + return parallel::broadcast_strings(all_items, mpi_rank, mpi_num_procs); + } + #endif // NGEN_WITH_MPI } NgenSimulation::NgenSimulation( @@ -87,10 +105,10 @@ void NgenSimulation::run_catchments(std::shared_ptr checkpoint_save // For now, set up a barrier to make sure all ranks can catch up before checkpointing. // A possible later improvement would be the ability to store and recreate the MPI messages when saving/loading checkpoints. this->sync_mpi_ranks(); - checkpoint_saver->clear_cache(this->mpi_rank_); - this->sync_mpi_ranks(); auto step_saver = checkpoint_saver->initialize_checkpoint_snapshot(simulation_step_, State_Saver::State_Durability::strict); this->save_checkpoint(step_saver); + this->sync_mpi_ranks(); + checkpoint_saver->clear_prior(this->mpi_rank_); } } } @@ -226,7 +244,8 @@ void NgenSimulation::load_checkpoint(std::shared_ptr chec } } -std::vector NgenSimulation::required_checkpoint_units() const { + +std::vector NgenSimulation::required_checkpoint_units(bool merge_all_ranks) const { std::vector units; units.push_back(this->unit_name()); for (const auto &layer : this->layers_) { @@ -234,6 +253,12 @@ std::vector NgenSimulation::required_checkpoint_units() const { for (const auto &unit : layer_units) units.push_back(unit); } +#if NGEN_WITH_MPI + if (merge_all_ranks && this->mpi_num_procs_ > 1) { + // merge all ranks' units so to ensure all read from the same source + units = collect_unique_strings(units, this->mpi_rank_, this->mpi_num_procs_); + } +#endif // NGEN_WITH_MPI return units; } @@ -292,18 +317,10 @@ void NgenSimulation::run_routing(NgenSimulation::hy_features_t &features, std::s for (const auto& nexus : nexus_indexes_) { local_nexus_ids.push_back(nexus.first); } - // MPI_Gather all nexus IDs into a single vector - std::vector all_nexus_ids = parallel::gather_strings(local_nexus_ids, mpi_rank_, mpi_num_procs_); - if (mpi_rank_ == 0) { - // filter to only the unique IDs - std::sort(all_nexus_ids.begin(), all_nexus_ids.end()); - all_nexus_ids.erase( - std::unique(all_nexus_ids.begin(), all_nexus_ids.end()), - all_nexus_ids.end() - ); - } - // MPI_Broadcast so all processes share the nexus IDs - all_nexus_ids = std::move(parallel::broadcast_strings(all_nexus_ids, mpi_rank_, mpi_num_procs_)); + // gather all nexus IDs into a single vector + std::vector all_nexus_ids = std::move(collect_unique_strings( + local_nexus_ids, mpi_rank_, mpi_num_procs_ + )); // MPI_Reduce to collect the results from processes if (mpi_rank_ == 0) { diff --git a/src/state_save_restore/File_Per_Unit.cpp b/src/state_save_restore/File_Per_Unit.cpp index 28c7600e82..da466d4aa4 100644 --- a/src/state_save_restore/File_Per_Unit.cpp +++ b/src/state_save_restore/File_Per_Unit.cpp @@ -82,7 +82,7 @@ std::shared_ptr File_Per_Unit_Saver::initialize_checkpoint return std::make_shared(checkpoint_path, durability); } -void File_Per_Unit_Saver::clear_cache(int mpi_rank) { +void File_Per_Unit_Saver::clear_prior(int mpi_rank) { if (mpi_rank == 0) { // reserve file system deletion to just the main MPI rank std::vector subdirs; ordered_checkpoint_subfolders(this->base_path_, subdirs); From 52fc6ff0bfbca8ff359cf69dd87d40e3622f5a01 Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Tue, 7 Apr 2026 10:15:02 -0400 Subject: [PATCH 5/7] Advance step before saving checkpoint --- src/core/NgenSimulation.cpp | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/core/NgenSimulation.cpp b/src/core/NgenSimulation.cpp index cfa49f2c27..af96d5b05c 100644 --- a/src/core/NgenSimulation.cpp +++ b/src/core/NgenSimulation.cpp @@ -85,19 +85,27 @@ void NgenSimulation::run_catchments() void NgenSimulation::run_catchments(std::shared_ptr checkpoint_saver, int frequency) { int num_times = get_num_output_times(); + if (frequency > num_times) { + LOG(LogLevel::WARNING, "The frequency of checkpoints (%d) is less than the number of simulation steps (%d). No checkpoints will be generated.", frequency, num_times); + } - for (; simulation_step_ < num_times; simulation_step_++) { + while (simulation_step_ < num_times) { // Make room for this output step's results catchment_outflows_.resize(catchment_outflows_.size() + catchment_indexes_.size(), 0.0); nexus_downstream_flows_.resize(nexus_downstream_flows_.size() + nexus_indexes_.size(), 0.0); advance_models_one_output_step(); - if (simulation_step_ + 1 < num_times) { + // advance_models_one_output_step runs the BMIs for the next time step, + // so the checkpoint state should include advancing the step + this->simulation_step_++; + + if (simulation_step_ < num_times) { sim_time_->advance_timestep(); } - if (simulation_step_ != 0 && simulation_step_ % frequency == 0) { + // this position allows creating a checkpoint on the very last step. This might be useful if t-route fails and we want to "skip" the final steps + if (simulation_step_ % frequency == 0) { // Remote data is currently handled by MPI, so saving a checkpoint without retreiving all messages could lose data. // An example would be checkpointing every 100 steps with two ranks. Rank 1 is faster than Rank 0, so it saves // step 200 whilst Rank 1 has only saved step 100. If checkpoints are loaded, all data in MPI messages from Rank 1 From 195038f9e2269e57066a64570b736d53f1df7f90 Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Tue, 7 Apr 2026 12:49:44 -0400 Subject: [PATCH 6/7] Advance Forcing Engine on checkpoint load --- include/forcing/ForcingsEngineDataProvider.hpp | 11 +++++++++++ include/simulation_time/Simulation_Time.hpp | 9 +++++++++ src/core/NgenSimulation.cpp | 10 ++++++++++ 3 files changed, 30 insertions(+) diff --git a/include/forcing/ForcingsEngineDataProvider.hpp b/include/forcing/ForcingsEngineDataProvider.hpp index 5987c9bf48..3e49b6dbaf 100644 --- a/include/forcing/ForcingsEngineDataProvider.hpp +++ b/include/forcing/ForcingsEngineDataProvider.hpp @@ -89,6 +89,17 @@ struct ForcingsEngineStorage { data_.clear(); } + /** Advance all forcing engine instances to update to the requested time. + * @param time The amount of seconds from the beginning of the simulation the forcing models will be advanaced to. + * This should use `0.0` as the start time and allow the backing models to update their own times accordingly. + */ + void advance_to(double time) { + for (auto &provider : data_) { + double bmi_start = provider.second->GetStartTime(); + provider.second->UpdateUntil(time + bmi_start); + } + } + private: //! Instance map of underlying BMI models. std::unordered_map data_; diff --git a/include/simulation_time/Simulation_Time.hpp b/include/simulation_time/Simulation_Time.hpp index 0c44426e16..a47a76c6a6 100644 --- a/include/simulation_time/Simulation_Time.hpp +++ b/include/simulation_time/Simulation_Time.hpp @@ -106,6 +106,15 @@ class Simulation_Time return current_date_time_epoch; } + /** + * @brief Accessor to the the current simulation time + * @return start_date_time_epoch + */ + time_t get_start_time() + { + return start_date_time_epoch; + } + /** * @brief Accessor to the current timestamp string * @return current_timestamp diff --git a/src/core/NgenSimulation.cpp b/src/core/NgenSimulation.cpp index af96d5b05c..09c2994bd0 100644 --- a/src/core/NgenSimulation.cpp +++ b/src/core/NgenSimulation.cpp @@ -8,6 +8,10 @@ #include "HY_Features.hpp" #endif +#if NGEN_WITH_PYTHON +#include +#endif + #include "state_save_restore/vecbuf.hpp" #include "state_save_restore/State_Save_Utils.hpp" #include "state_save_restore/State_Save_Restore.hpp" @@ -250,6 +254,12 @@ void NgenSimulation::load_checkpoint(std::shared_ptr chec for (auto& layer : layers_) { layer->load_checkpoint(checkpoint_loader); } +#if NGEN_WITH_PYTHON + // advance any forcing engine instances to make sure the first query doesn't get messy when catching up to the simulation's time + auto now = this->sim_time_->get_current_epoch_time(); + auto start = this->sim_time_->get_start_time(); + data_access::detail::ForcingsEngineStorage::instances.advance_to(now - start); +#endif // NGEN_WITH_PYTHON } From 36de6c4f1e28e1db95a2ccf967b5281fb6e75f31 Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Tue, 7 Apr 2026 15:45:23 -0400 Subject: [PATCH 7/7] Update requirements for build of NgenSimulationTest --- CMakeLists.txt | 4 ++-- src/core/CMakeLists.txt | 1 + test/CMakeLists.txt | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index a320eab3a0..4223b7e710 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -195,9 +195,9 @@ set(Boost_USE_MULTITHREADED ON) set(Boost_USE_STATIC_RUNTIME OFF) if(CMAKE_CXX_STANDARD LESS 17) # requires non-header filesystem for state saving if C++ 11 or lower - find_package(Boost 1.86.0 REQUIRED COMPONENTS system filesystem) + find_package(Boost 1.86.0 REQUIRED COMPONENTS system filesystem serialization) else() - find_package(Boost 1.86.0 REQUIRED) + find_package(Boost 1.86.0 REQUIRED COMPONENTS serialization) endif() # ----------------------------------------------------------------------------- diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index 5b42bf9181..e8ea930ffc 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -10,6 +10,7 @@ target_link_libraries(core PUBLIC target_link_libraries(core PRIVATE NGen::parallel NGen::state_save_restore + Boost::serialization # needed for the NgenSimulation test to compile ) target_link_libraries(core PUBLIC diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index a6b26ad200..1f4919bf8a 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -461,6 +461,7 @@ ngen_add_test( NGen::logging NGen::core_mediator NGen::ngen_bmi + NGen::forcing ) ########################### Netcdf Forcing Tests