diff --git a/CMakeLists.txt b/CMakeLists.txt index a320eab3a0..4223b7e710 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -195,9 +195,9 @@ set(Boost_USE_MULTITHREADED ON) set(Boost_USE_STATIC_RUNTIME OFF) if(CMAKE_CXX_STANDARD LESS 17) # requires non-header filesystem for state saving if C++ 11 or lower - find_package(Boost 1.86.0 REQUIRED COMPONENTS system filesystem) + find_package(Boost 1.86.0 REQUIRED COMPONENTS system filesystem serialization) else() - find_package(Boost 1.86.0 REQUIRED) + find_package(Boost 1.86.0 REQUIRED COMPONENTS serialization) endif() # ----------------------------------------------------------------------------- diff --git a/include/core/Layer.hpp b/include/core/Layer.hpp index 643f1b6970..33c44fb473 100644 --- a/include/core/Layer.hpp +++ b/include/core/Layer.hpp @@ -8,7 +8,9 @@ #include "Simulation_Time.hpp" #include "State_Exception.hpp" #include "geojson/FeatureBuilder.hpp" +#include "state_save_restore/State_Save_Restore.hpp" #include +#include namespace hy_features { @@ -16,9 +18,6 @@ namespace hy_features class HY_Features_MPI; } -class State_Snapshot_Saver; -class State_Snapshot_Loader; - namespace ngen { @@ -113,10 +112,20 @@ namespace ngen std::unordered_map &nexus_indexes, int current_step); - virtual void save_state_snapshot(std::shared_ptr snapshot_saver); - virtual void load_state_snapshot(std::shared_ptr snapshot_loader); + /** + * Save the current state including metatdata related to current layer times + */ + virtual void save_checkpoint(std::shared_ptr snapshot_saver); + /** + * Save the current state excluding metatdata related to current layer times + */ + virtual void save_end_of_run(std::shared_ptr snapshot_saver); + virtual void load_checkpoint(std::shared_ptr snapshot_loader); virtual void load_hot_start(std::shared_ptr snapshot_loader); + std::string unit_name() const; + virtual std::vector required_checkpoint_units() const; + protected: const LayerDescription description; @@ -127,7 +136,14 @@ namespace ngen feature_type& features; //TODO is this really required at the top level? or can this be moved to SurfaceLayer? const geojson::GeoJSON catchment_data; - long output_time_index; + long output_time_index; + + // Serialization template will be defined and instantiated in the .cpp file + friend class boost::serialization::access; + template + void serialize(Archive& ar, const unsigned int version) { + ar & this->simulation_time; + } }; } diff --git a/include/core/NgenSimulation.hpp b/include/core/NgenSimulation.hpp index 3189045edc..d009ccdfde 100644 --- a/include/core/NgenSimulation.hpp +++ b/include/core/NgenSimulation.hpp @@ -24,6 +24,8 @@ class State_Snapshot_Loader; #include #include +#include + // Contains all of the dynamic state and logic to run a NextGen hydrologic simulation class NgenSimulation { @@ -55,6 +57,9 @@ class NgenSimulation */ void run_catchments(); + // Alternative method of running catchments that includes saving checkpoints at a desired frequency. + void run_catchments(std::shared_ptr checkpoint_saver, int frequency); + // Tear down of any items stored on the NgenSimulation object that could throw errors and, thus, should be kept separate from the deconstructor. void finalize(); @@ -69,8 +74,6 @@ class NgenSimulation size_t get_num_output_times() const; std::string get_timestamp_for_step(int step) const; - void save_state_snapshot(std::shared_ptr snapshot_saver); - void load_state_snapshot(std::shared_ptr snapshot_loader); /** * Saves a snapshot state that's intended to be run at the end of a simulation. * @@ -79,10 +82,23 @@ class NgenSimulation void save_end_of_run(std::shared_ptr snapshot_saver); // Load a snapshot of the end of a previous run. This will create a T-Route python adapter if the loader finds a unit for it and the config path is not empty. void load_hot_start(std::shared_ptr snapshot_loader, const std::string &t_route_config_file_with_path); + // Load a snapshot of a checkpoint from a previous run. This will create a T-Route python adapter if the loader finds a unit for it (most likely to happen if the checkpoint is derived from ) + void load_checkpoint(std::shared_ptr checkpoint_loader); + // Save a snapshot that preserves data related to the current time and could be loaded to jump a simulation into the middle of a run. + void save_checkpoint(std::shared_ptr snapshot_saver); + + /** + * Get a vector of all the required units the NgenSimulation would need to load a checkpoint state. + * + * @param merge_all_ranks Whether to also get the units from other MPI ranks. This should only be `true` when calling blocking MPI processes is safe for the program. + */ + std::vector required_checkpoint_units(bool merge_all_ranks) const; private: void advance_models_one_output_step(); + inline void sync_mpi_ranks() const; + int simulation_step_; std::shared_ptr sim_time_; @@ -106,8 +122,9 @@ class NgenSimulation int mpi_num_procs_; // Serialization template will be defined and instantiated in the .cpp file + friend class boost::serialization::access; template - void serialize(Archive& ar); + void serialize(Archive& ar, const unsigned int version); }; #endif diff --git a/include/forcing/ForcingsEngineDataProvider.hpp b/include/forcing/ForcingsEngineDataProvider.hpp index 5987c9bf48..3e49b6dbaf 100644 --- a/include/forcing/ForcingsEngineDataProvider.hpp +++ b/include/forcing/ForcingsEngineDataProvider.hpp @@ -89,6 +89,17 @@ struct ForcingsEngineStorage { data_.clear(); } + /** Advance all forcing engine instances to update to the requested time. + * @param time The amount of seconds from the beginning of the simulation the forcing models will be advanaced to. + * This should use `0.0` as the start time and allow the backing models to update their own times accordingly. + */ + void advance_to(double time) { + for (auto &provider : data_) { + double bmi_start = provider.second->GetStartTime(); + provider.second->UpdateUntil(time + bmi_start); + } + } + private: //! Instance map of underlying BMI models. std::unordered_map data_; diff --git a/include/realizations/catchment/Bmi_Formulation.hpp b/include/realizations/catchment/Bmi_Formulation.hpp index 91236fea42..da6d72bc29 100644 --- a/include/realizations/catchment/Bmi_Formulation.hpp +++ b/include/realizations/catchment/Bmi_Formulation.hpp @@ -95,6 +95,11 @@ namespace realization { */ virtual void load_hot_start(std::shared_ptr loader) = 0; + // Unit name used for identifying a save state ID + inline std::string save_state_unit_name() const { + return this->get_id(); + } + /** * Convert a time value from the model to an epoch time in seconds. * diff --git a/include/simulation_time/Simulation_Time.hpp b/include/simulation_time/Simulation_Time.hpp index d2ea4969d4..a47a76c6a6 100644 --- a/include/simulation_time/Simulation_Time.hpp +++ b/include/simulation_time/Simulation_Time.hpp @@ -8,6 +8,8 @@ #include #include +#include + /** * @brief simulation_time_params providing configuration information for simulation time period. */ @@ -104,6 +106,15 @@ class Simulation_Time return current_date_time_epoch; } + /** + * @brief Accessor to the the current simulation time + * @return start_date_time_epoch + */ + time_t get_start_time() + { + return start_date_time_epoch; + } + /** * @brief Accessor to the current timestamp string * @return current_timestamp @@ -168,6 +179,12 @@ class Simulation_Time private: + friend class boost::serialization::access; + template + void serialize(Archive& ar, const unsigned int version) { + ar & this->current_date_time_epoch; + } + int total_output_times; int simulation_total_time_seconds; int output_interval_seconds; diff --git a/include/state_save_restore/File_Per_Unit.hpp b/include/state_save_restore/File_Per_Unit.hpp index faec8d966a..610f978c87 100644 --- a/include/state_save_restore/File_Per_Unit.hpp +++ b/include/state_save_restore/File_Per_Unit.hpp @@ -11,7 +11,9 @@ class File_Per_Unit_Saver : public State_Saver std::shared_ptr initialize_snapshot(State_Durability durability) override; - std::shared_ptr initialize_checkpoint_snapshot(snapshot_time_t epoch, State_Durability durability) override; + std::shared_ptr initialize_checkpoint_snapshot(int step, State_Durability durability) override; + + void clear_prior(int mpi_rank) override; void finalize() override; @@ -30,7 +32,7 @@ class File_Per_Unit_Loader : public State_Loader std::shared_ptr initialize_snapshot() override; - std::shared_ptr initialize_checkpoint_snapshot(State_Saver::snapshot_time_t epoch) override; + std::shared_ptr initialize_checkpoint_snapshot(const std::vector &required_units) override; private: std::string dir_path_; }; diff --git a/include/state_save_restore/State_Save_Restore.hpp b/include/state_save_restore/State_Save_Restore.hpp index 57a88b5f3e..f05b97a41f 100644 --- a/include/state_save_restore/State_Save_Restore.hpp +++ b/include/state_save_restore/State_Save_Restore.hpp @@ -5,6 +5,9 @@ #include #include +#include +#include +#include #include #include @@ -12,6 +15,7 @@ #include #include +#include "vecbuf.hpp" #include "State_Save_Utils.hpp" class State_Saver; @@ -49,15 +53,22 @@ class State_Save_Config */ std::unique_ptr hot_start() const; + std::unique_ptr checkpoint_loader() const; + + bool has_checkpoint_saver() const; + + std::shared_ptr checkpoint_saver(int *const frequency) const; + struct instance { - instance(std::string const& direction, std::string const& label, std::string const& path, std::string const& mechanism, std::string const& timing); + instance(std::string const& direction, std::string const& label, std::string const& path, std::string const& mechanism, std::string const& timing, boost::optional frequency); State_Save_Direction direction_; State_Save_Mechanism mechanism_; State_Save_When timing_; std::string label_; std::string path_; + int frequency_; std::string mechanism_string() const; }; @@ -69,8 +80,6 @@ class State_Save_Config class State_Saver { public: - using snapshot_time_t = std::chrono::time_point; - // Flag type to indicate whether state saving needs to ensure // stability of saved data wherever it is stored before returning // success @@ -79,8 +88,6 @@ class State_Saver State_Saver() = default; virtual ~State_Saver() = default; - static snapshot_time_t snapshot_time_now(); - /** * Return an object suitable for saving a simulation state as of a * particular moment in time, @param epoch @@ -91,7 +98,13 @@ class State_Saver */ virtual std::shared_ptr initialize_snapshot(State_Durability durability) = 0; - virtual std::shared_ptr initialize_checkpoint_snapshot(snapshot_time_t epoch, State_Durability durability) = 0; + virtual std::shared_ptr initialize_checkpoint_snapshot(int step, State_Durability durability) = 0; + + /** Clear data related to states prior to the last save state associated with this State_Saver + * + * @param mpi_rank The process' MPI rank that may be used for determining responsibility of cleanup. + */ + virtual void clear_prior(int mpi_rank) = 0; /** * Execute any logic necessary to cleanly finish usage, and @@ -114,6 +127,18 @@ class State_Snapshot_Saver */ virtual void save_unit(std::string const& unit_name, boost::span data) = 0; + /** + * Capture the data from a single unit that can be serialized with a Boost binary_oarchive + */ + template + void archive_unit(const std::string &unit_name, T *item) { + vecbuf buffer; + boost::archive::binary_oarchive archive(buffer); + archive << (*item); + boost::span data(buffer.data(), buffer.size()); + this->save_unit(unit_name, data); + } + /** * Execute logic to complete the saving process * @@ -142,7 +167,13 @@ class State_Loader */ virtual std::shared_ptr initialize_snapshot() = 0; - virtual std::shared_ptr initialize_checkpoint_snapshot(State_Saver::snapshot_time_t epoch) = 0; + /** + * Return an object suitable for loading a checkpoint simulation state with all the required units. + * + * @param required_units Vector of all units that are required for a simulation state to be considered complete. + * @param checkpoint_step Pointer that will be set to the step number of the valid state. + */ + virtual std::shared_ptr initialize_checkpoint_snapshot(const std::vector &required_units) = 0; /** * Execute any logic necessary to cleanly finish usage, and @@ -169,6 +200,18 @@ class State_Snapshot_Loader */ virtual void load_unit(const std::string &unit_name, std::vector &data) = 0; + /** + * Load unit data and immediately dearchive it with a Boost binary_iarchive + */ + template + void dearchive_unit(const std::string &unit_name, T *item) { + std::vector buffer; + this->load_unit(unit_name, buffer); + membuf data(buffer.data(), buffer.size()); + boost::archive::binary_iarchive archive(data); + archive >> (*item); + } + /** * Execute logic to complete the saving process * diff --git a/include/state_save_restore/State_Save_Utils.hpp b/include/state_save_restore/State_Save_Utils.hpp index 9713b660af..c3b6064db8 100644 --- a/include/state_save_restore/State_Save_Utils.hpp +++ b/include/state_save_restore/State_Save_Utils.hpp @@ -23,8 +23,8 @@ enum class State_Save_Mechanism { enum class State_Save_When { None = 0, EndOfRun, - FirstOfMonth, - StartOfRun + StartOfRun, + Checkpoint }; #endif diff --git a/src/NGen.cpp b/src/NGen.cpp index b4b8d14978..dcd4ea5544 100644 --- a/src/NGen.cpp +++ b/src/NGen.cpp @@ -710,7 +710,24 @@ int run_ngen(int argc, char* argv[], int mpi_num_procs, int mpi_rank) { } } - simulation->run_catchments(); + { // optionally load a checkpoint if configured + auto checkpoint_loader = state_saving_config.checkpoint_loader(); + if (checkpoint_loader) { + LOG(LogLevel::INFO, "Loading checkpoint data from prior snapshot."); + const std::vector required_units = simulation->required_checkpoint_units(true); + std::shared_ptr snapshot_loader + = checkpoint_loader->initialize_checkpoint_snapshot(required_units); + simulation->load_checkpoint(snapshot_loader); + } + } + + if (state_saving_config.has_checkpoint_saver()) { + int checkpoint_frequency; + std::shared_ptr checkpoint_saver = state_saving_config.checkpoint_saver(&checkpoint_frequency); + simulation->run_catchments(checkpoint_saver, checkpoint_frequency); + } else { + simulation->run_catchments(); + } #if NGEN_WITH_MPI MPI_Barrier(MPI_COMM_WORLD); diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index 8793567a65..e8ea930ffc 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -9,6 +9,8 @@ target_link_libraries(core PUBLIC target_link_libraries(core PRIVATE NGen::parallel + NGen::state_save_restore + Boost::serialization # needed for the NgenSimulation test to compile ) target_link_libraries(core PUBLIC diff --git a/src/core/Layer.cpp b/src/core/Layer.cpp index 432b918aa3..97d5cedcf3 100644 --- a/src/core/Layer.cpp +++ b/src/core/Layer.cpp @@ -94,10 +94,30 @@ void ngen::Layer::update_models(boost::span catchment_outflows, } } -void ngen::Layer::save_state_snapshot(std::shared_ptr snapshot_saver) +std::string ngen::Layer::unit_name() const { + return "lyr-" + std::to_string(this->get_id()); +} + +std::vector ngen::Layer::required_checkpoint_units() const { + std::vector units; + units.push_back(this->unit_name()); + for (auto const& id : processing_units) { + auto r = features.catchment_at(id); + auto r_c = std::dynamic_pointer_cast(r); + units.push_back(r_c->save_state_unit_name()); + } + return units; +} + +void ngen::Layer::save_checkpoint(std::shared_ptr snapshot_saver) { - // XXX Handle any of this class's own state as a meta-data unit + snapshot_saver->archive_unit(this->unit_name(), this); + // assume save_end_of_run ist just the BMI save + this->save_end_of_run(snapshot_saver); +} +void ngen::Layer::save_end_of_run(std::shared_ptr snapshot_saver) +{ for (auto const& id : processing_units) { auto r = features.catchment_at(id); auto r_c = std::dynamic_pointer_cast(r); @@ -105,10 +125,9 @@ void ngen::Layer::save_state_snapshot(std::shared_ptr snap } } -void ngen::Layer::load_state_snapshot(std::shared_ptr snapshot_loader) +void ngen::Layer::load_checkpoint(std::shared_ptr snapshot_loader) { - // XXX Handle any of this class's own state as a meta-data unit - + snapshot_loader->dearchive_unit(this->unit_name(), this); for (auto const& id : processing_units) { auto r = features.catchment_at(id); auto r_c = std::dynamic_pointer_cast(r); @@ -118,8 +137,7 @@ void ngen::Layer::load_state_snapshot(std::shared_ptr sna void ngen::Layer::load_hot_start(std::shared_ptr snapshot_loader) { - // XXX Handle any of this class's own state as a meta-data unit - + // no Layer metadata needs to load for this for (auto const& id : processing_units) { auto r = features.catchment_at(id); auto r_c = std::dynamic_pointer_cast(r); diff --git a/src/core/NgenSimulation.cpp b/src/core/NgenSimulation.cpp index 9addca80d4..09c2994bd0 100644 --- a/src/core/NgenSimulation.cpp +++ b/src/core/NgenSimulation.cpp @@ -8,13 +8,43 @@ #include "HY_Features.hpp" #endif +#if NGEN_WITH_PYTHON +#include +#endif + +#include "state_save_restore/vecbuf.hpp" #include "state_save_restore/State_Save_Utils.hpp" #include "state_save_restore/State_Save_Restore.hpp" + +#include +#include +#include +#include +#include + #include "parallel_utils.h" namespace { - const auto NGEN_UNIT_NAME = "ngen"; const auto TROUTE_UNIT_NAME = "troute"; + + #if NGEN_WITH_MPI + // Merge strings from multiple MPI ranks and broadcast the results to all ranks. Duplicates will be removed. + std::vector collect_unique_strings(const std::vector &items, int mpi_rank, int mpi_num_procs) { + // MPI_Gather all items to rank 0 + std::vector all_items + = parallel::gather_strings(items, mpi_rank, mpi_num_procs); + if (mpi_rank == 0) { + // filter to only the unique items + std::sort(all_items.begin(), all_items.end()); + all_items.erase( + std::unique(all_items.begin(), all_items.end()), + all_items.end() + ); + } + // MPI_Broadcast unique, sorted results to all ranks + return parallel::broadcast_strings(all_items, mpi_rank, mpi_num_procs); + } + #endif // NGEN_WITH_MPI } NgenSimulation::NgenSimulation( @@ -57,6 +87,51 @@ void NgenSimulation::run_catchments() } } +void NgenSimulation::run_catchments(std::shared_ptr checkpoint_saver, int frequency) { + int num_times = get_num_output_times(); + if (frequency > num_times) { + LOG(LogLevel::WARNING, "The frequency of checkpoints (%d) is less than the number of simulation steps (%d). No checkpoints will be generated.", frequency, num_times); + } + + while (simulation_step_ < num_times) { + // Make room for this output step's results + catchment_outflows_.resize(catchment_outflows_.size() + catchment_indexes_.size(), 0.0); + nexus_downstream_flows_.resize(nexus_downstream_flows_.size() + nexus_indexes_.size(), 0.0); + + advance_models_one_output_step(); + + // advance_models_one_output_step runs the BMIs for the next time step, + // so the checkpoint state should include advancing the step + this->simulation_step_++; + + if (simulation_step_ < num_times) { + sim_time_->advance_timestep(); + } + + // this position allows creating a checkpoint on the very last step. This might be useful if t-route fails and we want to "skip" the final steps + if (simulation_step_ % frequency == 0) { + // Remote data is currently handled by MPI, so saving a checkpoint without retreiving all messages could lose data. + // An example would be checkpointing every 100 steps with two ranks. Rank 1 is faster than Rank 0, so it saves + // step 200 whilst Rank 1 has only saved step 100. If checkpoints are loaded, all data in MPI messages from Rank 1 + // for steps 100-199 will be lost, and the program will likely hang as Rank 0 waits for those messages from Rank 1. + // For now, set up a barrier to make sure all ranks can catch up before checkpointing. + // A possible later improvement would be the ability to store and recreate the MPI messages when saving/loading checkpoints. + this->sync_mpi_ranks(); + auto step_saver = checkpoint_saver->initialize_checkpoint_snapshot(simulation_step_, State_Saver::State_Durability::strict); + this->save_checkpoint(step_saver); + this->sync_mpi_ranks(); + checkpoint_saver->clear_prior(this->mpi_rank_); + } + } +} + +void NgenSimulation::sync_mpi_ranks() const { +#if NGEN_WITH_MPI + if (this->mpi_num_procs_ > 1) + MPI_Barrier(MPI_COMM_WORLD); +#endif // NGEN_WITH_MPI +} + void NgenSimulation::finalize() { #if NGEN_WITH_ROUTING if (this->py_troute_) { @@ -120,20 +195,10 @@ void NgenSimulation::advance_models_one_output_step() } -void NgenSimulation::save_state_snapshot(std::shared_ptr snapshot_saver) -{ - // TODO: save the current nexus data - auto unit_name = this->unit_name(); - // XXX Handle self, then recursively pass responsibility to Layers - for (auto& layer : layers_) { - layer->save_state_snapshot(snapshot_saver); - } -} - void NgenSimulation::save_end_of_run(std::shared_ptr snapshot_saver) { for (auto& layer : layers_) { - layer->save_state_snapshot(snapshot_saver); + layer->save_end_of_run(snapshot_saver); } #if NGEN_WITH_ROUTING if (this->mpi_rank_ == 0 && this->py_troute_) { @@ -148,11 +213,10 @@ void NgenSimulation::save_end_of_run(std::shared_ptr snaps #endif // NGEN_WITH_ROUTING } -void NgenSimulation::load_state_snapshot(std::shared_ptr snapshot_loader) { - // TODO: load the state data related to nexus outflows - auto unit_name = this->unit_name(); +void NgenSimulation::save_checkpoint(std::shared_ptr snapshot_saver) { + snapshot_saver->archive_unit(this->unit_name(), this); for (auto& layer : layers_) { - layer->load_state_snapshot(snapshot_loader); + layer->save_checkpoint(snapshot_saver); } } @@ -185,6 +249,37 @@ void NgenSimulation::load_hot_start(std::shared_ptr snaps #endif // NGEN_WITH_ROUTING } +void NgenSimulation::load_checkpoint(std::shared_ptr checkpoint_loader) { + checkpoint_loader->dearchive_unit(this->unit_name(), this); + for (auto& layer : layers_) { + layer->load_checkpoint(checkpoint_loader); + } +#if NGEN_WITH_PYTHON + // advance any forcing engine instances to make sure the first query doesn't get messy when catching up to the simulation's time + auto now = this->sim_time_->get_current_epoch_time(); + auto start = this->sim_time_->get_start_time(); + data_access::detail::ForcingsEngineStorage::instances.advance_to(now - start); +#endif // NGEN_WITH_PYTHON +} + + +std::vector NgenSimulation::required_checkpoint_units(bool merge_all_ranks) const { + std::vector units; + units.push_back(this->unit_name()); + for (const auto &layer : this->layers_) { + const auto layer_units = layer->required_checkpoint_units(); + for (const auto &unit : layer_units) + units.push_back(unit); + } +#if NGEN_WITH_MPI + if (merge_all_ranks && this->mpi_num_procs_ > 1) { + // merge all ranks' units so to ensure all read from the same source + units = collect_unique_strings(units, this->mpi_rank_, this->mpi_num_procs_); + } +#endif // NGEN_WITH_MPI + return units; +} + void NgenSimulation::make_troute(const std::string &t_route_config_file_with_path) { #if NGEN_WITH_ROUTING @@ -240,18 +335,10 @@ void NgenSimulation::run_routing(NgenSimulation::hy_features_t &features, std::s for (const auto& nexus : nexus_indexes_) { local_nexus_ids.push_back(nexus.first); } - // MPI_Gather all nexus IDs into a single vector - std::vector all_nexus_ids = parallel::gather_strings(local_nexus_ids, mpi_rank_, mpi_num_procs_); - if (mpi_rank_ == 0) { - // filter to only the unique IDs - std::sort(all_nexus_ids.begin(), all_nexus_ids.end()); - all_nexus_ids.erase( - std::unique(all_nexus_ids.begin(), all_nexus_ids.end()), - all_nexus_ids.end() - ); - } - // MPI_Broadcast so all processes share the nexus IDs - all_nexus_ids = std::move(parallel::broadcast_strings(all_nexus_ids, mpi_rank_, mpi_num_procs_)); + // gather all nexus IDs into a single vector + std::vector all_nexus_ids = std::move(collect_unique_strings( + local_nexus_ids, mpi_rank_, mpi_num_procs_ + )); // MPI_Reduce to collect the results from processes if (mpi_rank_ == 0) { @@ -301,7 +388,7 @@ void NgenSimulation::run_routing(NgenSimulation::hy_features_t &features, std::s // case a future implementation needs these two values from the ngen framework. int delta_time = sim_time_->get_output_interval_seconds(); - // model for routing + // if the t-route model was not created from a hot start load, make it now if (this->py_troute_ == NULL) { this->make_troute(t_route_config_file_with_path); } @@ -346,16 +433,14 @@ std::string NgenSimulation::get_timestamp_for_step(int step) const } template -void NgenSimulation::serialize(Archive& ar) { +void NgenSimulation::serialize(Archive& ar, const unsigned int version) { /* Handle `catchment_formulation_manager` specially in the * overall checkpoint/restart logic, so that we can subset * individual catchments and do other tricky things */ //ar & catchment_formulation_manager ar & simulation_step_; - ar & sim_time_; - - // Layers will be reconstructed, but their internal time keeping needs to be serialized + ar & (*sim_time_); // Nexus and catchment indexes could be re-generated, but only if // the set of catchments remains consistent diff --git a/src/realizations/catchment/Bmi_Module_Formulation.cpp b/src/realizations/catchment/Bmi_Module_Formulation.cpp index 359a257f88..97a5c1eedf 100644 --- a/src/realizations/catchment/Bmi_Module_Formulation.cpp +++ b/src/realizations/catchment/Bmi_Module_Formulation.cpp @@ -25,14 +25,14 @@ namespace realization { // Rely on Formulation_Manager also using this->get_id() // as a unique key for the individual catchment // formulations - saver->save_unit(this->get_id(), data); + saver->save_unit(this->save_state_unit_name(), data); this->free_serialization_state(); } void Bmi_Module_Formulation::load_state(std::shared_ptr loader) { std::vector buffer; - loader->load_unit(this->get_id(), buffer); + loader->load_unit(this->save_state_unit_name(), buffer); boost::span data(buffer.data(), buffer.size()); this->load_serialization_state(data); } diff --git a/src/realizations/catchment/Bmi_Multi_Formulation.cpp b/src/realizations/catchment/Bmi_Multi_Formulation.cpp index ed5b296193..bb6b8a627a 100644 --- a/src/realizations/catchment/Bmi_Multi_Formulation.cpp +++ b/src/realizations/catchment/Bmi_Multi_Formulation.cpp @@ -26,7 +26,7 @@ using namespace realization; void Bmi_Multi_Formulation::save_state(std::shared_ptr saver) { - LOG(LogLevel::DEBUG, "Saving state for Multi-BMI %s", this->get_id()); + LOG(LogLevel::DEBUG, "Saving state for Multi-BMI %s", this->save_state_unit_name()); vecbuf data; boost::archive::binary_oarchive archive(data); // serialization function handles freeing the sub-BMI states after archiving them @@ -38,13 +38,13 @@ void Bmi_Multi_Formulation::save_state(std::shared_ptr sav bmi->free_serialization_state(); } boost::span span(data.data(), data.size()); - saver->save_unit(this->get_id(), span); + saver->save_unit(this->save_state_unit_name(), span); } void Bmi_Multi_Formulation::load_state(std::shared_ptr loader) { - LOG(LogLevel::DEBUG, "Loading save state for Multi-BMI %s", this->get_id()); + LOG(LogLevel::DEBUG, "Loading save state for Multi-BMI %s", this->save_state_unit_name()); std::vector data; - loader->load_unit(this->get_id(), data); + loader->load_unit(this->save_state_unit_name(), data); membuf stream(data.data(), data.size()); boost::archive::binary_iarchive archive(stream); archive >> (*this); diff --git a/src/state_save_restore/File_Per_Unit.cpp b/src/state_save_restore/File_Per_Unit.cpp index 6d355d4c93..da466d4aa4 100644 --- a/src/state_save_restore/File_Per_Unit.cpp +++ b/src/state_save_restore/File_Per_Unit.cpp @@ -17,18 +17,28 @@ #error "No Filesystem library implementation available" #endif +#include +#include +#include + #include #include -namespace unit_saving_utils { - std::string format_epoch(State_Saver::snapshot_time_t epoch) - { - time_t t = std::chrono::system_clock::to_time_t(epoch); - std::tm tm = *std::gmtime(&t); - - std::stringstream tss; - tss << std::put_time(&tm, "%Y-%m-%dT%H:%M:%S"); - return tss.str(); +namespace { + // Populate a vector of paths with subfolders with names that can be interpreted as ints. + // The vector will be sorted by highest numeric representation first. + void ordered_checkpoint_subfolders(const std::string &root, std::vector &subdirs) { + for (const auto &subdir : directory_iterator(root)) { + path subdir_path = subdir.path(); + // make sure subfolder is a number from a timestep + if (subdir_path.filename().string().find_first_not_of("0123456789") == std::string::npos) { + subdirs.push_back(subdir); + } + } + // sort options by the highest number representation + std::sort(subdirs.begin(), subdirs.end(), [](const path &a, const path &b) { + return std::stoi(a.filename().string()) > std::stoi(b.filename().string()); + }); } } @@ -62,17 +72,27 @@ File_Per_Unit_Saver::File_Per_Unit_Saver(std::string base_path) File_Per_Unit_Saver::~File_Per_Unit_Saver() = default; std::shared_ptr File_Per_Unit_Saver::initialize_snapshot(State_Durability durability) { - // TODO return std::make_shared(path(this->base_path_), durability); } -std::shared_ptr File_Per_Unit_Saver::initialize_checkpoint_snapshot(snapshot_time_t epoch, State_Durability durability) +std::shared_ptr File_Per_Unit_Saver::initialize_checkpoint_snapshot(int step, State_Durability durability) { - path checkpoint_path = path(this->base_path_) / unit_saving_utils::format_epoch(epoch); + path checkpoint_path = path(this->base_path_) / std::to_string(step); create_directory(checkpoint_path); return std::make_shared(checkpoint_path, durability); } +void File_Per_Unit_Saver::clear_prior(int mpi_rank) { + if (mpi_rank == 0) { // reserve file system deletion to just the main MPI rank + std::vector subdirs; + ordered_checkpoint_subfolders(this->base_path_, subdirs); + // delete all checkpoint directories save the most recent one + for (int i = 1; i < subdirs.size(); ++i) { + remove_all(subdirs[i]); + } + } +} + void File_Per_Unit_Saver::finalize() { // nothing to be done @@ -188,9 +208,26 @@ std::shared_ptr File_Per_Unit_Loader::initialize_snapshot return std::make_shared(path(dir_path_)); } -std::shared_ptr File_Per_Unit_Loader::initialize_checkpoint_snapshot(State_Saver::snapshot_time_t epoch) +std::shared_ptr File_Per_Unit_Loader::initialize_checkpoint_snapshot(const std::vector &required_units) { - path checkpoint_path = path(dir_path_) / unit_saving_utils::format_epoch(epoch);; - return std::make_shared(checkpoint_path); + std::vector options; + ordered_checkpoint_subfolders(this->dir_path_, options); + for (const path &option : options) { + auto loader = std::make_shared(option); + bool passes = true; + for (const auto &unit : required_units) { + if (!loader->has_unit(unit)) { + passes = false; + break; + } + } + if (passes) { + LOG(LogLevel::INFO, "Loading state from checkpoint step " + option.filename().string()); + return loader; + } + } + std::string error = "No checkpoint location found with all required units in root directory " + this->dir_path_; + LOG(LogLevel::FATAL, error); + throw std::runtime_error(error); } diff --git a/src/state_save_restore/State_Save_Restore.cpp b/src/state_save_restore/State_Save_Restore.cpp index 4a54b1a21c..7486936979 100644 --- a/src/state_save_restore/State_Save_Restore.cpp +++ b/src/state_save_restore/State_Save_Restore.cpp @@ -1,5 +1,6 @@ #include #include +#include #include "ewts_ngen/logger.hpp" @@ -8,6 +9,9 @@ #include +#include +#include + State_Save_Config::State_Save_Config(boost::property_tree::ptree const& tree) { auto maybe = tree.get_child_optional("state_saving"); @@ -19,6 +23,8 @@ State_Save_Config::State_Save_Config(boost::property_tree::ptree const& tree) } bool hot_start = false; + bool checkpoint_load = false; + bool checkpoint_save = false; for (const auto& saving_config : *maybe) { try { auto& subtree = saving_config.second; @@ -27,13 +33,25 @@ State_Save_Config::State_Save_Config(boost::property_tree::ptree const& tree) auto where = subtree.get("path"); auto how = subtree.get("type"); auto when = subtree.get("when"); + auto frequency = subtree.get_optional("frequency"); - instance i{direction, what, where, how, when}; + instance i{direction, what, where, how, when, frequency}; if (i.timing_ == State_Save_When::StartOfRun && i.direction_ == State_Save_Direction::Load) { if (hot_start) throw std::runtime_error("Only one hot start state saving configuration is allowed."); hot_start = true; } + if (i.timing_ == State_Save_When::Checkpoint) { + if (i.direction_ == State_Save_Direction::Load) { + if (checkpoint_load) + throw std::runtime_error("Only one checkpointing load state saving configuration is allowed."); + checkpoint_load = true; + } else if (i.direction_ == State_Save_Direction::Save) { + if (checkpoint_save) + throw std::runtime_error("Only one checkpointing save state saving configuration is allowed."); + checkpoint_save = true; + } + } instances_.push_back(i); } catch (std::exception &e) { LOG("Bad state saving config: " + std::string(e.what()), LogLevel::WARNING); @@ -86,12 +104,50 @@ std::unique_ptr State_Save_Config::hot_start() const { } } } - return std::unique_ptr(); + return NULL; } -State_Save_Config::instance::instance(std::string const& direction, std::string const& label, std::string const& path, std::string const& mechanism, std::string const& timing) +std::unique_ptr State_Save_Config::checkpoint_loader() const { + for (const auto &i : this->instances_) { + if (i.direction_ == State_Save_Direction::Load && i.timing_ == State_Save_When::Checkpoint) { + if (i.mechanism_ == State_Save_Mechanism::FilePerUnit) { + return std::make_unique(i.path_); + } else { + Logger::LogAndThrow("State_Save_Config: Loading mechanism " + i.mechanism_string() + " is not supported for checkpoint loading."); + } + } + } + return NULL; +} + +bool State_Save_Config::has_checkpoint_saver() const { + for (const auto &i : this->instances_) { + if (i.direction_ == State_Save_Direction::Save && i.timing_ == State_Save_When::Checkpoint) { + return true; + } + } + return false; +} + +std::shared_ptr State_Save_Config::checkpoint_saver(int *const frequency) const { + for (const auto &i : this->instances_) { + if (i.direction_ == State_Save_Direction::Save && i.timing_ == State_Save_When::Checkpoint) { + if (i.mechanism_ == State_Save_Mechanism::FilePerUnit) { + *frequency = i.frequency_; + return std::make_shared(i.path_); + } else { + Logger::LogAndThrow("State_Save_Config: Saving mechanism " + i.mechanism_string() + " is not supported for checkpoint saving."); + } + } + } + Logger::LogAndThrow("State_Save_Config: Failed to find a suitable checkpoint saving configuration."); + return NULL; +} + +State_Save_Config::instance::instance(std::string const& direction, std::string const& label, std::string const& path, std::string const& mechanism, std::string const& timing, boost::optional frequency) : label_(label) , path_(path) + , frequency_{-1} { if (direction == "save") { direction_ = State_Save_Direction::Save; @@ -115,10 +171,22 @@ State_Save_Config::instance::instance(std::string const& direction, std::string if (timing == "EndOfRun") { timing_ = State_Save_When::EndOfRun; - } else if (timing == "FirstOfMonth") { - timing_ = State_Save_When::FirstOfMonth; } else if (timing == "StartOfRun") { timing_ = State_Save_When::StartOfRun; + } else if (timing == "Checkpoint") { // starts with "Checkpoint" + timing_ = State_Save_When::Checkpoint; + if (direction_ == State_Save_Direction::Save) { + if (!frequency.has_value()) { + Logger::LogAndThrow("The checkpoint save configuration is missing a 'frequency' of checkpointing."); + } + frequency_ = frequency.get(); + // make sure the frequency makes sense + if (frequency_ <= 0) { + std::stringstream ss; + ss << "The frequency of a checkpoint save must be greater than 0. The configured value is " << frequency_; + Logger::LogAndThrow(ss.str()); + } + } } else { std::string message = "Unrecognized state saving timing '" + timing + "'"; std::string throw_msg; throw_msg.assign(message); @@ -143,12 +211,3 @@ State_Snapshot_Saver::State_Snapshot_Saver(State_Saver::State_Durability durabil { } - -State_Saver::snapshot_time_t State_Saver::snapshot_time_now() { -#if __cplusplus < 201703L // C++ < 17 - auto now = std::chrono::system_clock::now(); - return std::chrono::time_point_cast(now); -#else - return std::chrono::floor(std::chrono::system_clock::now()); -#endif -} diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index a6b26ad200..1f4919bf8a 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -461,6 +461,7 @@ ngen_add_test( NGen::logging NGen::core_mediator NGen::ngen_bmi + NGen::forcing ) ########################### Netcdf Forcing Tests