import os
import logging
import time
import datetime
import json
import pandas as pd
from pathlib import Path
from topsim.core.config import Config
from topsim.core.monitor import Monitor
from topsim.core.scheduler import Scheduler
from topsim.core.cluster import Cluster
from topsim.core.buffer import Buffer
from topsim.core.planner import Planner
from topsim.core.delay import DelayModel
LOGGER = logging.getLogger(__name__)
HEARTBEAT_INT_SECONDS = 10*60 # Seconds
[docs]
class Simulation:
"""
The Simulation class is a wrapper for all Actors; we start the simulation
through the simulation class, which in turn invokes the initial Actors and
monitoring, and provides the conditions for checking if the simulation has
finished.
Parameters
----------
env : :py:obj:`simpy.Environment` object
The discrete-event simulation environment. This is the way TOpSim
simulation maintains state across the different actors,
and interfaces with the simpy processes.
config : Path
Path to the simulation JSOn configuration file
instrument : :py:obj:`~topsim.core.instrument.Instrument`
User-defined implementation of the Instrument class.
planning_model : :py:obj:`~topsim.algorithms.planning.Planning` object
User-defined implementation of the planning algorithm class
scheduling: :py:obj:`~topsim.algorithms.scheduling.Algorithm`
User-defined implementation of the scheduling algorithm
:py:obj:`abc.ABC`.
delay: :py:obj:`~topsim.core.delay.DelayModel`, optional
for the simulation.
timestamp: float, optional
Optional Simulation start-time; this is useful for testing, to ensure we
name the file and the tests match up. Also useful if you do not want to
use the time of the simulation as the name.
to_file : bool, optional
`True` if the simulation is to be written to a Pandas `pkl` file;
`False` will return pandas DataFrame objects at the completion of the
:py:meth:`~topsim.core.simulation.Simulation.run` function.
Notes
-----
If to_file left as `False`, simulation results and output will be returned
as Pandas DataFrames (see
:py:meth:`~topsim.core.simulation.Simulation.run`) . This is designed for
running multiple simulations, allowing for the appending of individual
simulation results to a 'global' :py:obj:`~pandas.DataFrame` . Current
support for output is limited to Panda's `.pkl` files.
Parsing in the option `delimiters` provides a way of differentiating
between multiple simulations within a single HDF5 store (for example,
in an experiment). A typical experimental loop may involve the following
structure:
>>> for heuristic in list_of_scheduling_heuristics
>>> for algorithm in list_of_planning_algorithms
>>> for cfg in list_of_system_configs
>>> ...
>>> delimiter = f'{heuristic}/{algorithm}/{cfg}'
This means when querying HDF5 output files, the results of each
simulation can be filtered nicely:
>>> store = pd.HDFStore('path/to/output.h5')
>>> # Returns a dataframe of simulation results
>>> store['heuristic_1/algorithm_3/cfg.json']
Examples
--------
Standard simulation with data frame output
>>> env = simpy.environment()
>>> config = Path('path/to/config')
>>> instrument = CustomInstrument()
>>> plan = PlanningModel()
>>> sched = SchedulingModel()
>>> simulation = Simulation(env, config, instrument,plan,sched)
If we want delays in the model:
>>> dm = DelayModel(prob=0.1, dist='normal', dm=DelayModel.DelayDegree.LOW)
>>> simulation = Simulation(
>>> env, config, instrument,plan,sched, delay=dm
>>> )
Running a simulation to completion:
>>> simulation.run()
Running a simulation for a specific time period, then resuming:
>>> simulation.run(runtime=100)
>>> ### Check current status of simulatiion
>>> simulation.resume(until=150)
Raises
------
"""
def __init__(
self,
env,
config,
instrument,
planning_model,
scheduling,
delay=None,
timestamp=None,
to_file=False,
hdf5_path=None,
use_task_data=False,
use_edge_data=True,
**kwargs
):
#: :py:obj:`simpy.Environment` object
self.env = env
#: :py:obj:`~topsim.core.monitor.Monitor` instance
if timestamp is not None:
self.monitor = Monitor(self, timestamp)
self._timestamp = datetime.datetime.fromtimestamp(timestamp)
else:
self._timestamp = datetime.datetime.now()
self.monitor = Monitor(self, self._timestamp)
# Process necessary config files
self._cfg_path = Path(config) #: Configuration path
# Initialise Actor and Resource objects
self._cfg = Config(config)
#: :py:obj:`~topsim.core.cluster.Cluster` instance
self.cluster = Cluster(env, self._cfg)
#: :py:obj:`~topsim.core.buffer.Buffer` instance
planning_model = planning_model
# planning_model.ingest_requirements = self._cfg.get_max_ingest(instrument.name)
if not delay:
# TODO Have this approach replicated so we don't specify the
# model outside the simulation.
delay = DelayModel(0.0, "normal", DelayModel.DelayDegree.NONE)
self.planner = Planner(
env, self.cluster, planning_model, use_task_data, use_edge_data, delay
)
self.buffer = Buffer(env, self.cluster, self.planner, self._cfg)
scheduling_algorithm = scheduling
scheduling_algorithm.ingest_requirements = self._cfg.get_max_ingest(instrument.name)
#: :py:obj:`~topsim.core.scheduler.Scheduler` instance
self.scheduler = Scheduler(
env, self.buffer, self.cluster, self.planner, scheduling_algorithm
)
#: User-defined :py:obj:`~topsim.core.instrument.Instrument` instance
self.instrument = instrument(
env=self.env,
config=self._cfg,
planner=self.planner,
scheduler=self.scheduler
)
#: :py:obj:`bool` Flag for producing simulation output in a `.pkl`
# file.
self.to_file = to_file
if self.to_file and hdf5_path:
try:
if os.path.exists(hdf5_path):
LOGGER.warning(
'Output HDF5 path already exists, '
'simulation appended to existing file'
)
self._hdf5_store = pd.HDFStore(hdf5_path)
self._hdf5_store.close()
except Exception as e:
LOGGER.error('%s', e)
self._hdf5_store = None
elif self.to_file and hdf5_path is None:
raise ValueError(
'Attempted to initialise Simulation object that outputs'
'to file without providing file path'
)
else:
LOGGER.info('Simulation output will not be stored directly to file')
if 'delimiters' in kwargs:
#: Used to separate different simulations in HDF5 output
self._delimiters = kwargs['delimiters']
else:
self._delimiters = ''
self.params = {"use_task_data": [use_task_data], "use_edge_data":[use_edge_data]}
self.running = False
def start(self, runtime=-1):
"""
Run the simulation, either for the specified runtime, OR until the
exit conditions are reached.
The exit conditions are:
* There are no more observations to process,
* There is nothing left in the Buffer
* The Scheduler is not waiting to allocate machines to resources
* There are not tasks still running on the cluster.
Parameters
----------
runtime : int
Nominiated runtime of the simulation. If the simulation length is
known, pass that as the argument. If not, passing in a negative
value (typically, just -1) will run the simulation until the
exit condition is reached.
Returns
-------
If `to_file` is True:
sim_data_path, task_data_path : str
Path names for the global simulation runtime and the
individual task data output.
If `to_file` is False:
Two pandas.DataFrame objects for global sim runtime and task data.
"""
if self.running:
raise RuntimeError(
"start() has already been called!"
"Use resume() to continue a simulation that is already in "
"progress."
)
self.running = True
self.env.process(self.monitor.run())
self.env.process(self.instrument.run())
self.env.process(self.cluster.run())
self.scheduler.start()
self.env.process(self.scheduler.run())
self.env.process(self.buffer.run())
heartbeat = time.monotonic() # use monotonic to avoid going backward
if runtime > 0:
self.env.run(until=runtime)
else:
while not self.is_finished():
self.env.run(self.env.now + 1)
now = time.monotonic()
if now - heartbeat > HEARTBEAT_INT_SECONDS:
wall_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
LOGGER.info("Wall time: %s / sim_time: %d", wall_time, self.env.now)
heartbeat = now # self.env.run(self.env.now + 1)
LOGGER.info("Simulation Finished @ %s", self.env.now)
self.monitor.collate_events()
if self.to_file and self._hdf5_store is not None:
global_df = self.monitor.df
summary_df = self.monitor.events
self._hdf5_store.open()
self._compose_hdf5_output(global_df, summary_df)
self._hdf5_store.close()
else:
return self.monitor.df, self._generate_final_task_data()
def resume(self, until):
"""
Resume a simulation for a period of time.
Useful for testing purposes, as we do not re-initialise the process
calls as we used to in
:py:obj:`~core.topsim.simulation.Simulation.start`
Parameters
----------
until : int
The (non-inclusive) :py:obj:`Simpy.env.now` timestep that we want to
continue to in the simulation
Returns
-------
self.env.now : float
The current time in the simulation
"""
if not self.running:
raise RuntimeError(
"Simulation has not been started! Call start() to initialise "
"the process stack."
)
self.env.run(until=until)
def is_finished(self):
"""
Check if simulation is finished based on the following finish
conditions:
* The Instrument is idle (i.e. has no more observations left to run)
* The Cluster is idle (no tasks are running)
* The Buffer is empty (no data sits on the buffer)
* The Schedule is idle (there are no more workflows/tasks queued)
It is only when all of these return True that the simulation is
regarded as finished.
Returns
-------
True if the above requirements are met; False otherwise (i.e. the
simulation is still running).
"""
if (
self.buffer.is_empty() and self.cluster.is_idle() and
self.scheduler.is_idle() and self.instrument.is_idle()
):
return True
return False
def summary(self):
"""
Produce a formatted summary of events that occured during simulation
Returns
-------
df
"""
events = self.monitor.events
finish = max(events['timestep'],0)
@staticmethod
def _split_monolithic_config(self, json):
return json
def _generate_final_task_data(self):
"""
Generate a final data frame from the cluster's task dataframe output.
Returns
-------
"""
df = self.cluster.finished_task_time_data()
df = df.T
size = len(df)
df['scheduling'] = [str(self.planner.model.algorithm) for x in range(size)]
df['planning'] = [
str(self.scheduler.algorithm) for x in range(size)
]
df['config'] = [str(self._cfg_path) for x in range(size)]
return df.infer_objects()
def _compose_hdf5_output(self, global_df, summary_df):
"""
Given a :py:obj:`pandas.HDFStore()` object, put global simulation,
task specific, and configuration data into HDF5 storage files.
Parameters
----------
global_df : :py:obj:pandas.DataFrame
The global, per-timestep overview of the simulation
summary_df : :py:obj:pandas.DataFrame
Information on the major events in each actor
Returns
-------
"""
if self._timestamp:
ts = f'{self._timestamp}'
else:
ts = f'{datetime.datetime.today().strftime("%y_%m_%d_%H_%M_%S")}'
ts = self._timestamp.strftime("%a%y%m%d%H%M%S")
sanitised_path = self._cfg_path.name.replace(".json", '').split('/')[-1]
final_key = f'{ts}/{self._delimiters}/{sanitised_path}'
global_df = global_df.fillna(0)
self._hdf5_store.put(key=f"{final_key}/sim", value=global_df)
self._hdf5_store.put(key=f'{final_key}/summary',
value=summary_df)
self._hdf5_store.put(key=f'{final_key}/params', value=pd.DataFrame(self.params))
return self._hdf5_store
def _stringify_json_data(self, path, relative=True):
"""
From a given file pointer, get a string representation of the data stored
Parameters
----------
fp : file pointer for the opened JSON file
Returns
-------
jstr : String representation of JSON-encoded data
Raises:
"""
if relative:
fpath = self._cfg.path.parent / path
else:
fpath = path
try:
with open(fpath) as fp:
jdict = json.load(fp)
except json.JSONDecodeError:
raise
jstr = json.dumps(jdict) # , indent=2)
return jstr