Source code for topsim.core.simulation

import os
import logging
import time
import datetime
import json

import pandas as pd

from pathlib import Path
from topsim.core.config import Config
from topsim.core.monitor import Monitor
from topsim.core.scheduler import Scheduler
from topsim.core.cluster import Cluster
from topsim.core.buffer import Buffer
from topsim.core.planner import Planner
from topsim.core.delay import DelayModel

LOGGER = logging.getLogger(__name__)
HEARTBEAT_INT_SECONDS = 10*60 # Seconds

[docs] class Simulation: """ The Simulation class is a wrapper for all Actors; we start the simulation through the simulation class, which in turn invokes the initial Actors and monitoring, and provides the conditions for checking if the simulation has finished. Parameters ---------- env : :py:obj:`simpy.Environment` object The discrete-event simulation environment. This is the way TOpSim simulation maintains state across the different actors, and interfaces with the simpy processes. config : Path Path to the simulation JSOn configuration file instrument : :py:obj:`~topsim.core.instrument.Instrument` User-defined implementation of the Instrument class. planning_model : :py:obj:`~topsim.algorithms.planning.Planning` object User-defined implementation of the planning algorithm class scheduling: :py:obj:`~topsim.algorithms.scheduling.Algorithm` User-defined implementation of the scheduling algorithm :py:obj:`abc.ABC`. delay: :py:obj:`~topsim.core.delay.DelayModel`, optional for the simulation. timestamp: float, optional Optional Simulation start-time; this is useful for testing, to ensure we name the file and the tests match up. Also useful if you do not want to use the time of the simulation as the name. to_file : bool, optional `True` if the simulation is to be written to a Pandas `pkl` file; `False` will return pandas DataFrame objects at the completion of the :py:meth:`~topsim.core.simulation.Simulation.run` function. Notes ----- If to_file left as `False`, simulation results and output will be returned as Pandas DataFrames (see :py:meth:`~topsim.core.simulation.Simulation.run`) . This is designed for running multiple simulations, allowing for the appending of individual simulation results to a 'global' :py:obj:`~pandas.DataFrame` . Current support for output is limited to Panda's `.pkl` files. Parsing in the option `delimiters` provides a way of differentiating between multiple simulations within a single HDF5 store (for example, in an experiment). A typical experimental loop may involve the following structure: >>> for heuristic in list_of_scheduling_heuristics >>> for algorithm in list_of_planning_algorithms >>> for cfg in list_of_system_configs >>> ... >>> delimiter = f'{heuristic}/{algorithm}/{cfg}' This means when querying HDF5 output files, the results of each simulation can be filtered nicely: >>> store = pd.HDFStore('path/to/output.h5') >>> # Returns a dataframe of simulation results >>> store['heuristic_1/algorithm_3/cfg.json'] Examples -------- Standard simulation with data frame output >>> env = simpy.environment() >>> config = Path('path/to/config') >>> instrument = CustomInstrument() >>> plan = PlanningModel() >>> sched = SchedulingModel() >>> simulation = Simulation(env, config, instrument,plan,sched) If we want delays in the model: >>> dm = DelayModel(prob=0.1, dist='normal', dm=DelayModel.DelayDegree.LOW) >>> simulation = Simulation( >>> env, config, instrument,plan,sched, delay=dm >>> ) Running a simulation to completion: >>> simulation.run() Running a simulation for a specific time period, then resuming: >>> simulation.run(runtime=100) >>> ### Check current status of simulatiion >>> simulation.resume(until=150) Raises ------ """ def __init__( self, env, config, instrument, planning_model, scheduling, delay=None, timestamp=None, to_file=False, hdf5_path=None, use_task_data=False, use_edge_data=True, **kwargs ): #: :py:obj:`simpy.Environment` object self.env = env #: :py:obj:`~topsim.core.monitor.Monitor` instance if timestamp is not None: self.monitor = Monitor(self, timestamp) self._timestamp = datetime.datetime.fromtimestamp(timestamp) else: self._timestamp = datetime.datetime.now() self.monitor = Monitor(self, self._timestamp) # Process necessary config files self._cfg_path = Path(config) #: Configuration path # Initialise Actor and Resource objects self._cfg = Config(config) #: :py:obj:`~topsim.core.cluster.Cluster` instance self.cluster = Cluster(env, self._cfg) #: :py:obj:`~topsim.core.buffer.Buffer` instance planning_model = planning_model # planning_model.ingest_requirements = self._cfg.get_max_ingest(instrument.name) if not delay: # TODO Have this approach replicated so we don't specify the # model outside the simulation. delay = DelayModel(0.0, "normal", DelayModel.DelayDegree.NONE) self.planner = Planner( env, self.cluster, planning_model, use_task_data, use_edge_data, delay ) self.buffer = Buffer(env, self.cluster, self.planner, self._cfg) scheduling_algorithm = scheduling scheduling_algorithm.ingest_requirements = self._cfg.get_max_ingest(instrument.name) #: :py:obj:`~topsim.core.scheduler.Scheduler` instance self.scheduler = Scheduler( env, self.buffer, self.cluster, self.planner, scheduling_algorithm ) #: User-defined :py:obj:`~topsim.core.instrument.Instrument` instance self.instrument = instrument( env=self.env, config=self._cfg, planner=self.planner, scheduler=self.scheduler ) #: :py:obj:`bool` Flag for producing simulation output in a `.pkl` # file. self.to_file = to_file if self.to_file and hdf5_path: try: if os.path.exists(hdf5_path): LOGGER.warning( 'Output HDF5 path already exists, ' 'simulation appended to existing file' ) self._hdf5_store = pd.HDFStore(hdf5_path) self._hdf5_store.close() except Exception as e: LOGGER.error('%s', e) self._hdf5_store = None elif self.to_file and hdf5_path is None: raise ValueError( 'Attempted to initialise Simulation object that outputs' 'to file without providing file path' ) else: LOGGER.info('Simulation output will not be stored directly to file') if 'delimiters' in kwargs: #: Used to separate different simulations in HDF5 output self._delimiters = kwargs['delimiters'] else: self._delimiters = '' self.params = {"use_task_data": [use_task_data], "use_edge_data":[use_edge_data]} self.running = False def start(self, runtime=-1): """ Run the simulation, either for the specified runtime, OR until the exit conditions are reached. The exit conditions are: * There are no more observations to process, * There is nothing left in the Buffer * The Scheduler is not waiting to allocate machines to resources * There are not tasks still running on the cluster. Parameters ---------- runtime : int Nominiated runtime of the simulation. If the simulation length is known, pass that as the argument. If not, passing in a negative value (typically, just -1) will run the simulation until the exit condition is reached. Returns ------- If `to_file` is True: sim_data_path, task_data_path : str Path names for the global simulation runtime and the individual task data output. If `to_file` is False: Two pandas.DataFrame objects for global sim runtime and task data. """ if self.running: raise RuntimeError( "start() has already been called!" "Use resume() to continue a simulation that is already in " "progress." ) self.running = True self.env.process(self.monitor.run()) self.env.process(self.instrument.run()) self.env.process(self.cluster.run()) self.scheduler.start() self.env.process(self.scheduler.run()) self.env.process(self.buffer.run()) heartbeat = time.monotonic() # use monotonic to avoid going backward if runtime > 0: self.env.run(until=runtime) else: while not self.is_finished(): self.env.run(self.env.now + 1) now = time.monotonic() if now - heartbeat > HEARTBEAT_INT_SECONDS: wall_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") LOGGER.info("Wall time: %s / sim_time: %d", wall_time, self.env.now) heartbeat = now # self.env.run(self.env.now + 1) LOGGER.info("Simulation Finished @ %s", self.env.now) self.monitor.collate_events() if self.to_file and self._hdf5_store is not None: global_df = self.monitor.df summary_df = self.monitor.events self._hdf5_store.open() self._compose_hdf5_output(global_df, summary_df) self._hdf5_store.close() else: return self.monitor.df, self._generate_final_task_data() def resume(self, until): """ Resume a simulation for a period of time. Useful for testing purposes, as we do not re-initialise the process calls as we used to in :py:obj:`~core.topsim.simulation.Simulation.start` Parameters ---------- until : int The (non-inclusive) :py:obj:`Simpy.env.now` timestep that we want to continue to in the simulation Returns ------- self.env.now : float The current time in the simulation """ if not self.running: raise RuntimeError( "Simulation has not been started! Call start() to initialise " "the process stack." ) self.env.run(until=until) def is_finished(self): """ Check if simulation is finished based on the following finish conditions: * The Instrument is idle (i.e. has no more observations left to run) * The Cluster is idle (no tasks are running) * The Buffer is empty (no data sits on the buffer) * The Schedule is idle (there are no more workflows/tasks queued) It is only when all of these return True that the simulation is regarded as finished. Returns ------- True if the above requirements are met; False otherwise (i.e. the simulation is still running). """ if ( self.buffer.is_empty() and self.cluster.is_idle() and self.scheduler.is_idle() and self.instrument.is_idle() ): return True return False def summary(self): """ Produce a formatted summary of events that occured during simulation Returns ------- df """ events = self.monitor.events finish = max(events['timestep'],0) @staticmethod def _split_monolithic_config(self, json): return json def _generate_final_task_data(self): """ Generate a final data frame from the cluster's task dataframe output. Returns ------- """ df = self.cluster.finished_task_time_data() df = df.T size = len(df) df['scheduling'] = [str(self.planner.model.algorithm) for x in range(size)] df['planning'] = [ str(self.scheduler.algorithm) for x in range(size) ] df['config'] = [str(self._cfg_path) for x in range(size)] return df.infer_objects() def _compose_hdf5_output(self, global_df, summary_df): """ Given a :py:obj:`pandas.HDFStore()` object, put global simulation, task specific, and configuration data into HDF5 storage files. Parameters ---------- global_df : :py:obj:pandas.DataFrame The global, per-timestep overview of the simulation summary_df : :py:obj:pandas.DataFrame Information on the major events in each actor Returns ------- """ if self._timestamp: ts = f'{self._timestamp}' else: ts = f'{datetime.datetime.today().strftime("%y_%m_%d_%H_%M_%S")}' ts = self._timestamp.strftime("%a%y%m%d%H%M%S") sanitised_path = self._cfg_path.name.replace(".json", '').split('/')[-1] final_key = f'{ts}/{self._delimiters}/{sanitised_path}' global_df = global_df.fillna(0) self._hdf5_store.put(key=f"{final_key}/sim", value=global_df) self._hdf5_store.put(key=f'{final_key}/summary', value=summary_df) self._hdf5_store.put(key=f'{final_key}/params', value=pd.DataFrame(self.params)) return self._hdf5_store def _stringify_json_data(self, path, relative=True): """ From a given file pointer, get a string representation of the data stored Parameters ---------- fp : file pointer for the opened JSON file Returns ------- jstr : String representation of JSON-encoded data Raises: """ if relative: fpath = self._cfg.path.parent / path else: fpath = path try: with open(fpath) as fp: jdict = json.load(fp) except json.JSONDecodeError: raise jstr = json.dumps(jdict) # , indent=2) return jstr