Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,9 @@ set(Boost_USE_MULTITHREADED ON)
set(Boost_USE_STATIC_RUNTIME OFF)
if(CMAKE_CXX_STANDARD LESS 17)
# requires non-header filesystem for state saving if C++ 11 or lower
find_package(Boost 1.86.0 REQUIRED COMPONENTS system filesystem)
find_package(Boost 1.86.0 REQUIRED COMPONENTS system filesystem serialization)
else()
find_package(Boost 1.86.0 REQUIRED)
find_package(Boost 1.86.0 REQUIRED COMPONENTS serialization)
endif()

# -----------------------------------------------------------------------------
Expand Down
28 changes: 22 additions & 6 deletions include/core/Layer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,16 @@
#include "Simulation_Time.hpp"
#include "State_Exception.hpp"
#include "geojson/FeatureBuilder.hpp"
#include "state_save_restore/State_Save_Restore.hpp"
#include <boost/core/span.hpp>
#include <boost/serialization/serialization.hpp>

namespace hy_features
{
class HY_Features;
class HY_Features_MPI;
}

class State_Snapshot_Saver;
class State_Snapshot_Loader;

namespace ngen
{

Expand Down Expand Up @@ -113,10 +112,20 @@ namespace ngen
std::unordered_map<std::string, int> &nexus_indexes,
int current_step);

virtual void save_state_snapshot(std::shared_ptr<State_Snapshot_Saver> snapshot_saver);
virtual void load_state_snapshot(std::shared_ptr<State_Snapshot_Loader> snapshot_loader);
/**
* Save the current state including metatdata related to current layer times
*/
virtual void save_checkpoint(std::shared_ptr<State_Snapshot_Saver> snapshot_saver);
/**
* Save the current state excluding metatdata related to current layer times
*/
virtual void save_end_of_run(std::shared_ptr<State_Snapshot_Saver> snapshot_saver);
virtual void load_checkpoint(std::shared_ptr<State_Snapshot_Loader> snapshot_loader);
virtual void load_hot_start(std::shared_ptr<State_Snapshot_Loader> snapshot_loader);

std::string unit_name() const;
virtual std::vector<std::string> required_checkpoint_units() const;

protected:

const LayerDescription description;
Expand All @@ -127,7 +136,14 @@ namespace ngen
feature_type& features;
//TODO is this really required at the top level? or can this be moved to SurfaceLayer?
const geojson::GeoJSON catchment_data;
long output_time_index;
long output_time_index;

// Serialization template will be defined and instantiated in the .cpp file
friend class boost::serialization::access;
template <class Archive>
void serialize(Archive& ar, const unsigned int version) {
ar & this->simulation_time;
}

};
}
Expand Down
23 changes: 20 additions & 3 deletions include/core/NgenSimulation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class State_Snapshot_Loader;
#include <unordered_map>
#include <string>

#include <boost/serialization/serialization.hpp>

// Contains all of the dynamic state and logic to run a NextGen hydrologic simulation
class NgenSimulation
{
Expand Down Expand Up @@ -55,6 +57,9 @@ class NgenSimulation
*/
void run_catchments();

// Alternative method of running catchments that includes saving checkpoints at a desired frequency.
void run_catchments(std::shared_ptr<State_Saver> checkpoint_saver, int frequency);

// Tear down of any items stored on the NgenSimulation object that could throw errors and, thus, should be kept separate from the deconstructor.
void finalize();

Expand All @@ -69,8 +74,6 @@ class NgenSimulation
size_t get_num_output_times() const;
std::string get_timestamp_for_step(int step) const;

void save_state_snapshot(std::shared_ptr<State_Snapshot_Saver> snapshot_saver);
void load_state_snapshot(std::shared_ptr<State_Snapshot_Loader> snapshot_loader);
/**
* Saves a snapshot state that's intended to be run at the end of a simulation.
*
Expand All @@ -79,10 +82,23 @@ class NgenSimulation
void save_end_of_run(std::shared_ptr<State_Snapshot_Saver> snapshot_saver);
// Load a snapshot of the end of a previous run. This will create a T-Route python adapter if the loader finds a unit for it and the config path is not empty.
void load_hot_start(std::shared_ptr<State_Snapshot_Loader> snapshot_loader, const std::string &t_route_config_file_with_path);
// Load a snapshot of a checkpoint from a previous run. This will create a T-Route python adapter if the loader finds a unit for it (most likely to happen if the checkpoint is derived from )
void load_checkpoint(std::shared_ptr<State_Snapshot_Loader> checkpoint_loader);
// Save a snapshot that preserves data related to the current time and could be loaded to jump a simulation into the middle of a run.
void save_checkpoint(std::shared_ptr<State_Snapshot_Saver> snapshot_saver);

/**
* Get a vector of all the required units the NgenSimulation would need to load a checkpoint state.
*
* @param merge_all_ranks Whether to also get the units from other MPI ranks. This should only be `true` when calling blocking MPI processes is safe for the program.
*/
std::vector<std::string> required_checkpoint_units(bool merge_all_ranks) const;

private:
void advance_models_one_output_step();

inline void sync_mpi_ranks() const;

int simulation_step_;

std::shared_ptr<Simulation_Time> sim_time_;
Expand All @@ -106,8 +122,9 @@ class NgenSimulation
int mpi_num_procs_;

// Serialization template will be defined and instantiated in the .cpp file
friend class boost::serialization::access;
template <class Archive>
void serialize(Archive& ar);
void serialize(Archive& ar, const unsigned int version);
};

#endif
11 changes: 11 additions & 0 deletions include/forcing/ForcingsEngineDataProvider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,17 @@ struct ForcingsEngineStorage {
data_.clear();
}

/** Advance all forcing engine instances to update to the requested time.
* @param time The amount of seconds from the beginning of the simulation the forcing models will be advanaced to.
* This should use `0.0` as the start time and allow the backing models to update their own times accordingly.
*/
void advance_to(double time) {
for (auto &provider : data_) {
double bmi_start = provider.second->GetStartTime();
provider.second->UpdateUntil(time + bmi_start);
}
}

private:
//! Instance map of underlying BMI models.
std::unordered_map<key_type, value_type> data_;
Expand Down
5 changes: 5 additions & 0 deletions include/realizations/catchment/Bmi_Formulation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ namespace realization {
*/
virtual void load_hot_start(std::shared_ptr<State_Snapshot_Loader> loader) = 0;

// Unit name used for identifying a save state ID
inline std::string save_state_unit_name() const {
return this->get_id();
}

/**
* Convert a time value from the model to an epoch time in seconds.
*
Expand Down
17 changes: 17 additions & 0 deletions include/simulation_time/Simulation_Time.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <string>
#include <stdexcept>

#include <boost/serialization/serialization.hpp>

/**
* @brief simulation_time_params providing configuration information for simulation time period.
*/
Expand Down Expand Up @@ -104,6 +106,15 @@ class Simulation_Time
return current_date_time_epoch;
}

/**
* @brief Accessor to the the current simulation time
* @return start_date_time_epoch
*/
time_t get_start_time()
{
return start_date_time_epoch;
}

/**
* @brief Accessor to the current timestamp string
* @return current_timestamp
Expand Down Expand Up @@ -168,6 +179,12 @@ class Simulation_Time

private:

friend class boost::serialization::access;
template <class Archive>
void serialize(Archive& ar, const unsigned int version) {
ar & this->current_date_time_epoch;
}

int total_output_times;
int simulation_total_time_seconds;
int output_interval_seconds;
Expand Down
6 changes: 4 additions & 2 deletions include/state_save_restore/File_Per_Unit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ class File_Per_Unit_Saver : public State_Saver

std::shared_ptr<State_Snapshot_Saver> initialize_snapshot(State_Durability durability) override;

std::shared_ptr<State_Snapshot_Saver> initialize_checkpoint_snapshot(snapshot_time_t epoch, State_Durability durability) override;
std::shared_ptr<State_Snapshot_Saver> initialize_checkpoint_snapshot(int step, State_Durability durability) override;

void clear_prior(int mpi_rank) override;

void finalize() override;

Expand All @@ -30,7 +32,7 @@ class File_Per_Unit_Loader : public State_Loader

std::shared_ptr<State_Snapshot_Loader> initialize_snapshot() override;

std::shared_ptr<State_Snapshot_Loader> initialize_checkpoint_snapshot(State_Saver::snapshot_time_t epoch) override;
std::shared_ptr<State_Snapshot_Loader> initialize_checkpoint_snapshot(const std::vector<std::string> &required_units) override;
private:
std::string dir_path_;
};
Expand Down
57 changes: 50 additions & 7 deletions include/state_save_restore/State_Save_Restore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@

#include <boost/property_tree/ptree_fwd.hpp>
#include <boost/core/span.hpp>
#include <boost/serialization/serialization.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/archive/binary_iarchive.hpp>

#include <chrono>
#include <cstdint>
#include <memory>
#include <string>
#include <vector>

#include "vecbuf.hpp"
#include "State_Save_Utils.hpp"

class State_Saver;
Expand Down Expand Up @@ -49,15 +53,22 @@ class State_Save_Config
*/
std::unique_ptr<State_Loader> hot_start() const;

std::unique_ptr<State_Loader> checkpoint_loader() const;

bool has_checkpoint_saver() const;

std::shared_ptr<State_Saver> checkpoint_saver(int *const frequency) const;

struct instance
{
instance(std::string const& direction, std::string const& label, std::string const& path, std::string const& mechanism, std::string const& timing);
instance(std::string const& direction, std::string const& label, std::string const& path, std::string const& mechanism, std::string const& timing, boost::optional<int> frequency);

State_Save_Direction direction_;
State_Save_Mechanism mechanism_;
State_Save_When timing_;
std::string label_;
std::string path_;
int frequency_;

std::string mechanism_string() const;
};
Expand All @@ -69,8 +80,6 @@ class State_Save_Config
class State_Saver
{
public:
using snapshot_time_t = std::chrono::time_point<std::chrono::system_clock, std::chrono::seconds>;

// Flag type to indicate whether state saving needs to ensure
// stability of saved data wherever it is stored before returning
// success
Expand All @@ -79,8 +88,6 @@ class State_Saver
State_Saver() = default;
virtual ~State_Saver() = default;

static snapshot_time_t snapshot_time_now();

/**
* Return an object suitable for saving a simulation state as of a
* particular moment in time, @param epoch
Expand All @@ -91,7 +98,13 @@ class State_Saver
*/
virtual std::shared_ptr<State_Snapshot_Saver> initialize_snapshot(State_Durability durability) = 0;

virtual std::shared_ptr<State_Snapshot_Saver> initialize_checkpoint_snapshot(snapshot_time_t epoch, State_Durability durability) = 0;
virtual std::shared_ptr<State_Snapshot_Saver> initialize_checkpoint_snapshot(int step, State_Durability durability) = 0;

/** Clear data related to states prior to the last save state associated with this State_Saver
*
* @param mpi_rank The process' MPI rank that may be used for determining responsibility of cleanup.
*/
virtual void clear_prior(int mpi_rank) = 0;

/**
* Execute any logic necessary to cleanly finish usage, and
Expand All @@ -114,6 +127,18 @@ class State_Snapshot_Saver
*/
virtual void save_unit(std::string const& unit_name, boost::span<char const> data) = 0;

/**
* Capture the data from a single unit that can be serialized with a Boost binary_oarchive
*/
template <class T>
void archive_unit(const std::string &unit_name, T *item) {
vecbuf<char> buffer;
boost::archive::binary_oarchive archive(buffer);
archive << (*item);
boost::span<char> data(buffer.data(), buffer.size());
this->save_unit(unit_name, data);
}

/**
* Execute logic to complete the saving process
*
Expand Down Expand Up @@ -142,7 +167,13 @@ class State_Loader
*/
virtual std::shared_ptr<State_Snapshot_Loader> initialize_snapshot() = 0;

virtual std::shared_ptr<State_Snapshot_Loader> initialize_checkpoint_snapshot(State_Saver::snapshot_time_t epoch) = 0;
/**
* Return an object suitable for loading a checkpoint simulation state with all the required units.
*
* @param required_units Vector of all units that are required for a simulation state to be considered complete.
* @param checkpoint_step Pointer that will be set to the step number of the valid state.
*/
virtual std::shared_ptr<State_Snapshot_Loader> initialize_checkpoint_snapshot(const std::vector<std::string> &required_units) = 0;

/**
* Execute any logic necessary to cleanly finish usage, and
Expand All @@ -169,6 +200,18 @@ class State_Snapshot_Loader
*/
virtual void load_unit(const std::string &unit_name, std::vector<char> &data) = 0;

/**
* Load unit data and immediately dearchive it with a Boost binary_iarchive
*/
template<class T>
void dearchive_unit(const std::string &unit_name, T *item) {
std::vector<char> buffer;
this->load_unit(unit_name, buffer);
membuf data(buffer.data(), buffer.size());
boost::archive::binary_iarchive archive(data);
archive >> (*item);
}

/**
* Execute logic to complete the saving process
*
Expand Down
4 changes: 2 additions & 2 deletions include/state_save_restore/State_Save_Utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ enum class State_Save_Mechanism {
enum class State_Save_When {
None = 0,
EndOfRun,
FirstOfMonth,
StartOfRun
StartOfRun,
Checkpoint
};

#endif
19 changes: 18 additions & 1 deletion src/NGen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,24 @@ int run_ngen(int argc, char* argv[], int mpi_num_procs, int mpi_rank) {
}
}

simulation->run_catchments();
{ // optionally load a checkpoint if configured
auto checkpoint_loader = state_saving_config.checkpoint_loader();
if (checkpoint_loader) {
LOG(LogLevel::INFO, "Loading checkpoint data from prior snapshot.");
const std::vector<std::string> required_units = simulation->required_checkpoint_units(true);
std::shared_ptr<State_Snapshot_Loader> snapshot_loader
= checkpoint_loader->initialize_checkpoint_snapshot(required_units);
simulation->load_checkpoint(snapshot_loader);
}
}

if (state_saving_config.has_checkpoint_saver()) {
int checkpoint_frequency;
std::shared_ptr<State_Saver> checkpoint_saver = state_saving_config.checkpoint_saver(&checkpoint_frequency);
simulation->run_catchments(checkpoint_saver, checkpoint_frequency);
} else {
simulation->run_catchments();
}

#if NGEN_WITH_MPI
MPI_Barrier(MPI_COMM_WORLD);
Expand Down
2 changes: 2 additions & 0 deletions src/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ target_link_libraries(core PUBLIC

target_link_libraries(core PRIVATE
NGen::parallel
NGen::state_save_restore
Boost::serialization # needed for the NgenSimulation test to compile
)

target_link_libraries(core PUBLIC
Expand Down
Loading
Loading