Source code for topsim.core.instrument

# Copyright (C) 28/1/21 RW Bunney

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

from abc import ABC, abstractmethod
from enum import Enum


[docs] class Instrument(ABC): """ The Simulation class is a wrapper for all Actors; we start the simulation through the simulation class, which in turn invokes the initial Actors and monitoring, and provides the conditions for checking if the simulation has finished. Parameters ---------- env : :py:obj:`simpy.Environment` object The discrete-event simulation environment. This is the way TOpSim simulation maintains state across the different actors, and interfaces with the simpy processes. config : :py:obj:`~topsim.core.config.Config` instance Config object wrapper for the configuration file. planner : :py:obj:`~topsim.core.planner.Planner` instance The Planner Actor for the current simulation scheduler : :py:obj:`~topsim.core.scheduler.Scheduler` instance The Scheduler Actor for the current simulation Notes ----- This class is a Python :py:obj:`~abc.ABC`, meaning it requires user addition to implement the metaclasses. Recommended decisions to make in the `run()` method include: Check observation and instrument are ready given current instrument demand: >>> # Assuming self.capacity is a user-defined attribute >>> if(observation.is_ready(self.env.now, self.capacity)) Communicate with Scheduler to determine if the Buffer and Cluster have capacity to run a new Observation (Ingest and Storage conditions): >>> self.scheduler.check_ingest_capacity( >>> observation, pipelines, max_ingest >>>) If above conditions are reached, begin observations and request ingest allocation via scheduler: >>> self.begin_observation(observation) >>> self.env.process(self.scheduler.allocate_ingest( >>> observation, pipelines, planner)) **Note:** :py:meth:`~topsim.core.scheduler.Scheduler.allocate_ingest` generates a timeout on the `SimPy` discrete-event queue, which is why we call `env.process`. Finalise an Observation and initiate the 'clean-up'. >>> observation.status = self.finish_observation(observation) See Also --------- :py:obj:`~topsim.user.telescope.Telescope` Raises ------ """ def __init__(self, env, config, planner, scheduler): pass @abstractmethod def run(self): """ The starting point for the Instrument actor; this will make decisions per timestep and then once these decisions have been resolved, yield a timeout to indicate a single timestep has passed for the Telescope. Yields ------ self.env.timeout(x) A single simulation timestep of `x` time. """ pass @abstractmethod def to_df(self): """ Produce a `pandas.DataFrame` of output for the Simulation :py:obj:`~topsim.core.monitor.Monitor`. Returns ------- """ pass
[docs] class Observation(object): """ Observation object stores information about a given observation; the object also stores information about the workflow, and the generated plan for that workflow. Parameters ---------- name : str Observation name start : int Expected start-time of the observation duration : int Expected Duration of the observation demand : int Expected Telescope demand of (Number of arrays used) during observation workflow : str Path to the workflow specification (JSON file) type : str What type of observation (Continuum, Spectral, etc.) ingest_data_rate : int Expected incoming data rate produced by the observation (GB/s) total_data_size : int Total size of the data produced by the observation. This is updated every simulation time step based on the duration of the observation and ingest_data_rate plan : WorkflowPlan object Workflow pre-schedule generated by the Planner once observation has been started on Telescope. """ def __init__(self, name, start, duration, demand, workflow, data_rate, timestep='seconds',min_resources=-1,max_resources=-1): # TODO change to self.id self.name = name self.buffer_id = 0 self.cluster_id = 'default' # self.start = start self.est = start self.ast = None self.duration = duration self.demand = demand self.workflow = workflow self.total_data_size = 0 self.ingest_data_rate = data_rate self.timestep = timestep self.status = RunStatus.WAITING self.min_resources = min_resources self.min_resources = max_resources self.plan = None def is_ready(self, current_time, capacity): """ Parameters ---------- current_time: int The current simulation time (Simpy.env.now) capacity : int Current capacity of the telescope at this time Returns ------- True if telescope has capacity, and the observation is scheduled to start from now False if telescope does not have capacity """ if self.est <= current_time \ and self.demand <= capacity \ and self.status is RunStatus.WAITING: return True else: return False def is_finished(self, current_time, telescope_status): """ Check if the observation has finished on the telescope. Parameters ---------- current_time telescope_status Returns ------- """ if self.ast is None: return False elif current_time >= self.ast + self.duration \ and telescope_status \ and (self.status is not RunStatus.FINISHED): return True else: return False
class RunStatus(str, Enum): """ The status of an observation """ WAITING = 'WAITING' RUNNING = 'RUNNING' FINISHED = 'FINISHED'